• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pantsbuild / pants / 19015773527

02 Nov 2025 05:33PM UTC coverage: 17.872% (-62.4%) from 80.3%
19015773527

Pull #22816

github

web-flow
Merge a12d75757 into 6c024e162
Pull Request #22816: Update Pants internal Python to 3.14

4 of 5 new or added lines in 3 files covered. (80.0%)

28452 existing lines in 683 files now uncovered.

9831 of 55007 relevant lines covered (17.87%)

0.18 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

53.38
/src/python/pants/engine/streaming_workunit_handler.py
1
# Copyright 2019 Pants project contributors (see CONTRIBUTORS.md).
2
# Licensed under the Apache License, Version 2.0 (see LICENSE).
3

4
from __future__ import annotations
1✔
5

6
import logging
1✔
7
import threading
1✔
8
from abc import ABC, abstractmethod
1✔
9
from collections.abc import Callable, Iterable, Sequence
1✔
10
from dataclasses import dataclass
1✔
11
from typing import Any
1✔
12

13
from pants.base.specs import Specs
1✔
14
from pants.core.environments.rules import determine_bootstrap_environment
1✔
15
from pants.engine.addresses import Addresses
1✔
16
from pants.engine.environment import EnvironmentName
1✔
17
from pants.engine.fs import Digest, DigestContents, FileDigest, Snapshot
1✔
18
from pants.engine.internals.native_engine import PyThreadLocals
1✔
19
from pants.engine.internals.scheduler import SchedulerSession, Workunit
1✔
20
from pants.engine.internals.selectors import Params, concurrently
1✔
21
from pants.engine.rules import QueryRule, collect_rules, implicitly, rule
1✔
22
from pants.engine.target import Targets
1✔
23
from pants.engine.unions import UnionMembership, union
1✔
24
from pants.goal.run_tracker import RunTracker
1✔
25
from pants.option.options_bootstrapper import OptionsBootstrapper
1✔
26
from pants.util.logging import LogLevel
1✔
27
from pants.util.strutil import softwrap
1✔
28

29
logger = logging.getLogger(__name__)
1✔
30

31

32
# -----------------------------------------------------------------------------------------------
33
# Streaming workunits plugin API
34
# -----------------------------------------------------------------------------------------------
35

36

37
def thread_locals_get_for_current_thread() -> PyThreadLocals:
1✔
38
    """Gets the engine's thread local state for the current thread.
39

40
    In order to safely use StreamingWorkunitContext methods from additional threads,
41
    StreamingWorkunit plugins should propagate thread local state from the threads that they are
42
    initialized on to any additional threads that they spawn.
43
    """
44
    return PyThreadLocals.get_for_current_thread()
×
45

46

47
def thread_locals_set_for_current_thread(thread_locals: PyThreadLocals) -> None:
1✔
48
    """Sets the engine's thread local state for the current thread.
49

50
    See `thread_locals_get`.
51
    """
52
    thread_locals.set_for_current_thread()
×
53

54

55
@dataclass(frozen=True)
1✔
56
class TargetInfo:
1✔
57
    filename: str
1✔
58

59

60
@dataclass(frozen=True)
1✔
61
class ExpandedSpecs:
1✔
62
    targets: dict[str, list[TargetInfo]]
1✔
63

64

65
@dataclass(frozen=True)
1✔
66
class StreamingWorkunitContext:
1✔
67
    _scheduler: SchedulerSession
1✔
68
    _run_tracker: RunTracker
1✔
69
    _specs: Specs
1✔
70
    _options_bootstrapper: OptionsBootstrapper
1✔
71

72
    @property
1✔
73
    def run_tracker(self) -> RunTracker:
1✔
74
        """Returns the RunTracker for the current run of Pants."""
75
        return self._run_tracker
×
76

77
    def single_file_digests_to_bytes(self, digests: Sequence[FileDigest]) -> list[bytes]:
1✔
78
        """Return `bytes` for each `FileDigest`."""
UNCOV
79
        return self._scheduler.single_file_digests_to_bytes(digests)
×
80

81
    def snapshots_to_file_contents(
1✔
82
        self, snapshots: Sequence[Snapshot]
83
    ) -> tuple[DigestContents, ...]:
84
        """Given a sequence of Snapshot objects, return a tuple of DigestContents representing the
85
        files contained in those `Snapshot`s in sequence."""
UNCOV
86
        return self._scheduler.snapshots_to_file_contents(snapshots)
×
87

88
    def ensure_remote_has_recursive(self, digests: Sequence[Digest | FileDigest]) -> None:
1✔
89
        """Invoke the internal ensure_remote_has_recursive function, which ensures that a remote
90
        ByteStore, if it exists, has a copy of the files fingerprinted by each Digest."""
91
        return self._scheduler.ensure_remote_has_recursive(digests)
×
92

93
    def get_metrics(self) -> dict[str, int]:
1✔
94
        """Invoke the internal get_metrics function, which returns metrics for the Session."""
95
        return self._scheduler.get_metrics()
×
96

97
    def get_observation_histograms(self) -> dict[str, Any]:
1✔
98
        """Invoke the internal get_observation_histograms function, which serializes histograms
99
        generated from Pants-internal observation metrics observed during the current run of Pants.
100

101
        These metrics are useful for debugging Pants internals.
102
        """
103
        return self._scheduler.get_observation_histograms()
×
104

105
    def get_expanded_specs(self) -> ExpandedSpecs:
1✔
106
        """Return a dict containing the canonicalized addresses of the specs for this run, and what
107
        files they expand to."""
108

UNCOV
109
        params = Params(
×
110
            self._specs,
111
            self._options_bootstrapper,
112
            determine_bootstrap_environment(self._scheduler),
113
        )
UNCOV
114
        request = self._scheduler.execution_request([(Addresses, params), (Targets, params)])
×
UNCOV
115
        unexpanded_addresses, expanded_targets = self._scheduler.execute(request)
×
116

117
        targets_dict: dict[str, list[TargetInfo]] = {str(addr): [] for addr in unexpanded_addresses}
×
118
        for target in expanded_targets:
×
119
            target_spec = str(target.address.spec)
×
120
            source = targets_dict.get(target_spec)
×
121
            if source is None:
×
122
                target_gen_spec = str(target.address.maybe_convert_to_target_generator())
×
123
                source = targets_dict.get(target_gen_spec)
×
124
                if source is None:
×
125
                    # This is a thing, that may need investigating to be fully understood.
126
                    # merely patches over a crash here. See #18564.
127
                    logger.warning(
×
128
                        softwrap(
129
                            f"""
130
                            Unknown source address for target: {target_spec}
131
                            Target address generator: {target_gen_spec}
132

133
                            Input params:
134
                            {params}
135

136
                            Unexpanded addresses:
137
                            {unexpanded_addresses}
138

139
                            Expanded targets:
140
                            {expanded_targets}
141
                            """
142
                        )
143
                    )
144
                    targets_dict[target_gen_spec or target_spec] = source = []
×
145

146
            source.append(
×
147
                TargetInfo(
148
                    filename=(
149
                        target.address.filename
150
                        if target.address.is_file_target
151
                        else str(target.address)
152
                    )
153
                )
154
            )
155
        return ExpandedSpecs(targets=targets_dict)
×
156

157

158
class WorkunitsCallback(ABC):
1✔
159
    @abstractmethod
1✔
160
    def __call__(
1✔
161
        self,
162
        *,
163
        started_workunits: tuple[Workunit, ...],
164
        completed_workunits: tuple[Workunit, ...],
165
        finished: bool,
166
        context: StreamingWorkunitContext,
167
    ) -> None:
168
        """
169
        :started_workunits: Workunits that have started but not completed.
170
        :completed_workunits: Workunits that have completed.
171
        :finished: True when the last chunk of workunit data is reported to the callback.
172
        :context: A context providing access to functionality relevant to the run.
173
        """
174

175
    @property
1✔
176
    @abstractmethod
1✔
177
    def can_finish_async(self) -> bool:
1✔
178
        """Can this callback finish its work in the background after the Pants run has already
179
        completed?
180

181
        The main reason to `return False` is if your callback logs in its final call, when
182
        `finished=True`, as it may end up logging to `.pants.d/workdir/pants.log` instead of the
183
        console, which is harder for users to find. Otherwise, most callbacks should return `True`
184
        to avoid slowing down Pants from finishing the run.
185
        """
186

187

188
@dataclass(frozen=True)
1✔
189
class WorkunitsCallbackFactory:
1✔
190
    """A wrapper around a callable that constructs WorkunitsCallbacks.
191

192
    NB: This extra wrapping is because subtyping is not supported in the return position of a
193
    rule. See #11354 for discussion of that limitation.
194
    """
195

196
    callback_factory: Callable[[], WorkunitsCallback | None]
1✔
197

198

199
class WorkunitsCallbackFactories(tuple[WorkunitsCallbackFactory, ...]):
1✔
200
    """A list of registered factories for WorkunitsCallback instances."""
201

202

203
@union(in_scope_types=[EnvironmentName])
1✔
204
class WorkunitsCallbackFactoryRequest:
1✔
205
    """A request for a particular WorkunitsCallbackFactory."""
206

207

208
@rule(polymorphic=True)
1✔
209
async def construct_workunit_callback_factory(
1✔
210
    req: WorkunitsCallbackFactoryRequest, env_name: EnvironmentName
211
) -> WorkunitsCallbackFactory:
212
    raise NotImplementedError()
×
213

214

215
@rule
1✔
216
async def construct_workunits_callback_factories(
1✔
217
    union_membership: UnionMembership,
218
) -> WorkunitsCallbackFactories:
219
    request_types = union_membership.get(WorkunitsCallbackFactoryRequest)
×
220
    workunit_callback_factories = await concurrently(
×
221
        construct_workunit_callback_factory(
222
            **implicitly({request_type(): WorkunitsCallbackFactoryRequest})
223
        )
224
        for request_type in request_types
225
    )
226
    return WorkunitsCallbackFactories(workunit_callback_factories)
×
227

228

229
# -----------------------------------------------------------------------------------------------
230
# Streaming workunits handler
231
# -----------------------------------------------------------------------------------------------
232

233

234
class StreamingWorkunitHandler:
1✔
235
    """Periodically calls each registered WorkunitsCallback in a dedicated thread.
236

237
    This class should be used as a context manager.
238
    """
239

240
    def __init__(
1✔
241
        self,
242
        scheduler: SchedulerSession,
243
        run_tracker: RunTracker,
244
        callbacks: Iterable[WorkunitsCallback],
245
        options_bootstrapper: OptionsBootstrapper,
246
        specs: Specs,
247
        report_interval_seconds: float,
248
        allow_async_completion: bool,
249
        max_workunit_verbosity: LogLevel,
250
    ) -> None:
UNCOV
251
        scheduler = scheduler.isolated_shallow_clone("streaming_workunit_handler_session")
×
UNCOV
252
        self.callbacks = callbacks
×
UNCOV
253
        self.context = StreamingWorkunitContext(
×
254
            _scheduler=scheduler,
255
            _run_tracker=run_tracker,
256
            _specs=specs,
257
            _options_bootstrapper=options_bootstrapper,
258
        )
UNCOV
259
        self.thread_runner = (
×
260
            _InnerHandler(
261
                scheduler=scheduler,
262
                context=self.context,
263
                callbacks=self.callbacks,
264
                report_interval=report_interval_seconds,
265
                # TODO(10092) The max verbosity should be a per-client setting, rather than a global
266
                #  setting.
267
                max_workunit_verbosity=max_workunit_verbosity,
268
                allow_async_completion=allow_async_completion,
269
            )
270
            if callbacks
271
            else None
272
        )
273

274
    def __enter__(self) -> None:
1✔
UNCOV
275
        if not self.thread_runner:
×
276
            return
×
UNCOV
277
        self.thread_runner.start()
×
278

279
    def __exit__(self, exc_type, exc_value, traceback) -> None:
1✔
UNCOV
280
        if not self.thread_runner:
×
281
            return
×
UNCOV
282
        self.thread_runner.end()
×
UNCOV
283
        if exc_type is not None:
×
284
            self.thread_runner.join()
×
285

286

287
class _InnerHandler(threading.Thread):
1✔
288
    def __init__(
1✔
289
        self,
290
        scheduler: Any,
291
        context: StreamingWorkunitContext,
292
        callbacks: Iterable[WorkunitsCallback],
293
        report_interval: float,
294
        max_workunit_verbosity: LogLevel,
295
        allow_async_completion: bool,
296
    ) -> None:
UNCOV
297
        super().__init__(daemon=True)
×
UNCOV
298
        self.scheduler = scheduler
×
UNCOV
299
        self.context = context
×
UNCOV
300
        self.stop_request = threading.Event()
×
UNCOV
301
        self.report_interval = report_interval
×
UNCOV
302
        self.callbacks = callbacks
×
UNCOV
303
        self.max_workunit_verbosity = max_workunit_verbosity
×
304
        # TODO: Have a thread per callback so that some callbacks can always finish async even
305
        #  if others must be finished synchronously.
UNCOV
306
        self.block_until_complete = not allow_async_completion or any(
×
307
            callback.can_finish_async is False for callback in self.callbacks
308
        )
309
        # Get the parent thread's thread locals. Note that this thread has not yet started
310
        # as we are only in the constructor.
UNCOV
311
        self.thread_locals = PyThreadLocals.get_for_current_thread()
×
312

313
    def poll_workunits(self, *, finished: bool) -> None:
1✔
UNCOV
314
        workunits = self.scheduler.poll_workunits(self.max_workunit_verbosity)
×
UNCOV
315
        for callback in self.callbacks:
×
UNCOV
316
            callback(
×
317
                started_workunits=workunits["started"],
318
                completed_workunits=workunits["completed"],
319
                finished=finished,
320
                context=self.context,
321
            )
322

323
    def run(self) -> None:
1✔
324
        # First, set the thread's thread locals to the parent thread's in order to propagate the
325
        # console, workunit stores, etc.
UNCOV
326
        self.thread_locals.set_for_current_thread()
×
UNCOV
327
        while not self.stop_request.is_set():
×
UNCOV
328
            self.poll_workunits(finished=False)
×
UNCOV
329
            self.stop_request.wait(timeout=self.report_interval)
×
330
        else:
331
            # Make one final call. Note that this may run after the Pants run has already
332
            # completed, depending on whether the thread was joined or not.
UNCOV
333
            self.poll_workunits(finished=True)
×
334

335
    def end(self) -> None:
1✔
UNCOV
336
        self.stop_request.set()
×
UNCOV
337
        if self.block_until_complete:
×
UNCOV
338
            logger.debug(
×
339
                "Async completion is disabled: waiting for workunit callbacks to complete..."
340
            )
UNCOV
341
            super().join()
×
342
        else:
343
            logger.debug(
×
344
                "Async completion is enabled: workunit callbacks will complete in the background."
345
            )
346

347

348
def rules():
1✔
UNCOV
349
    return [
×
350
        QueryRule(WorkunitsCallbackFactories, (UnionMembership, EnvironmentName)),
351
        QueryRule(Targets, (Specs, OptionsBootstrapper, EnvironmentName)),
352
        QueryRule(Addresses, (Specs, OptionsBootstrapper, EnvironmentName)),
353
        *collect_rules(),
354
    ]
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc