• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pantsbuild / pants / 19015773527

02 Nov 2025 05:33PM UTC coverage: 17.872% (-62.4%) from 80.3%
19015773527

Pull #22816

github

web-flow
Merge a12d75757 into 6c024e162
Pull Request #22816: Update Pants internal Python to 3.14

4 of 5 new or added lines in 3 files covered. (80.0%)

28452 existing lines in 683 files now uncovered.

9831 of 55007 relevant lines covered (17.87%)

0.18 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

39.64
/src/python/pants/pantsd/service/pants_service.py
1
# Copyright 2015 Pants project contributors (see CONTRIBUTORS.md).
2
# Licensed under the Apache License, Version 2.0 (see LICENSE).
3

4
import logging
1✔
5
import threading
1✔
6
import time
1✔
7
from abc import ABC, abstractmethod
1✔
8
from collections.abc import KeysView
1✔
9
from dataclasses import dataclass
1✔
10

11
logger = logging.getLogger(__name__)
1✔
12

13

14
class PantsService(ABC):
1✔
15
    """Pants daemon service base class.
16

17
    The service lifecycle is made up of states described in the _ServiceState class, and controlled
18
    by a calling thread that is holding the Service `lifecycle_lock`. Under that lock, a caller
19
    can signal a service to "pause", "run", or "terminate" (see _ServiceState for more details).
20

21
    pantsd pauses all Services before forking a pantsd in order to ensure that no "relevant"
22
    locks are held (or awaited: see #6565) by threads that might not survive the fork. While paused,
23
    a Service must not have any threads running that might interact with any non-private locks.
24

25
    After forking, the pantsd (child) process should call `terminate()` to finish shutting down
26
    the service, and the parent process should call `resume()` to cause the service to resume running.
27
    """
28

29
    class ServiceError(Exception):
1✔
30
        pass
1✔
31

32
    def __init__(self):
1✔
UNCOV
33
        super().__init__()
×
UNCOV
34
        self.name = self.__class__.__name__
×
UNCOV
35
        self._state = _ServiceState()
×
36

37
    def setup(self, services: tuple["PantsService", ...]):
1✔
38
        """Called before `run` to allow for service->service or other side-effecting setup."""
UNCOV
39
        self.services = services
×
40

41
    @abstractmethod
1✔
42
    def run(self):
1✔
43
        """The main entry-point for the service called by the service runner."""
44

45
    def mark_pausing(self):
1✔
46
        """Triggers pausing of the service, without waiting for it to have paused.
47

48
        See the class and _ServiceState pydocs.
49
        """
UNCOV
50
        self._state.mark_pausing()
×
51

52
    def await_paused(self):
1✔
53
        """Once a service has been marked pausing, waits for it to have paused.
54

55
        See the class and _ServiceState pydocs.
56
        """
57
        self._state.await_paused()
×
58

59
    def resume(self):
1✔
60
        """Triggers the service to resume running, without waiting.
61

62
        See the class and _ServiceState pydocs.
63
        """
UNCOV
64
        self._state.mark_running()
×
65

66
    def terminate(self):
1✔
67
        """Triggers termination of the service, without waiting.
68

69
        See the class and _ServiceState pydocs.
70
        """
UNCOV
71
        self._state.mark_terminating()
×
72

73

74
class _ServiceState:
1✔
75
    """A threadsafe state machine for controlling a service running in another thread.
76

77
    The state machine represents two stable states:
78
      Running
79
      Paused
80
    And two transitional states:
81
      Pausing
82
      Terminating
83

84
    The methods of this class allow a caller to ask the Service to transition states, and then wait
85
    for those transitions to occur.
86

87
    A simplifying assumption is that there is one service thread that interacts with the state, and
88
    only one controlling thread. In the case of `pantsd`, the "one calling thread" condition is
89
    protected by the service `lifecycle_lock`.
90

91
    A complicating assumption is that while a service thread is `Paused`, it must be in a position
92
    where it could safely disappear and never come back. This is accounted for by having the service
93
    thread wait on a Condition variable while Paused: testing indicates that for multiple Pythons
94
    on both OSX and Linux, this does not result in poisoning of the associated Lock.
95
    """
96

97
    _RUNNING = "Running"
1✔
98
    _PAUSED = "Paused"
1✔
99
    _PAUSING = "Pausing"
1✔
100
    _TERMINATING = "Terminating"
1✔
101

102
    def __init__(self):
1✔
103
        """Creates a ServiceState in the Running state."""
UNCOV
104
        self._state = self._RUNNING
×
UNCOV
105
        self._lock = threading.Lock()
×
UNCOV
106
        self._condition = threading.Condition(self._lock)
×
107

108
    def _set_state(self, state, *valid_states):
1✔
UNCOV
109
        if valid_states and self._state not in valid_states:
×
110
            raise AssertionError(f"Cannot move {self} to `{state}` while it is `{self._state}`.")
×
UNCOV
111
        self._state = state
×
UNCOV
112
        self._condition.notify_all()
×
113

114
    def await_paused(self, timeout=None):
1✔
115
        """Blocks until the service is in the Paused state, then returns True.
116

117
        If a timeout is specified, the method may return False to indicate a timeout: with no timeout
118
        it will always (eventually) return True.
119

120
        Raises if the service is not currently in the Pausing state.
121
        """
UNCOV
122
        deadline = time.time() + timeout if timeout else None
×
UNCOV
123
        with self._lock:
×
124
            # Wait until the service transitions out of Pausing.
UNCOV
125
            while self._state != self._PAUSED:
×
UNCOV
126
                if self._state != self._PAUSING:
×
127
                    raise AssertionError(
×
128
                        "Cannot wait for {} to reach `{}` while it is in `{}`.".format(
129
                            self, self._PAUSED, self._state
130
                        )
131
                    )
UNCOV
132
                timeout = deadline - time.time() if deadline else None
×
UNCOV
133
                if timeout and timeout <= 0:
×
UNCOV
134
                    return False
×
UNCOV
135
                self._condition.wait(timeout=timeout)
×
UNCOV
136
            return True
×
137

138
    def maybe_pause(self, timeout=None):
1✔
139
        """Called by the service to indicate that it is pausable.
140

141
        If the service calls this method while the state is `Pausing`, the state will transition
142
        to `Paused`, and the service will block here until it is marked `Running` or `Terminating`.
143

144
        If the state is not currently `Pausing`, and a timeout is not passed, this method returns
145
        immediately. If a timeout is passed, this method blocks up to that number of seconds to wait
146
        to transition to `Pausing`.
147
        """
UNCOV
148
        deadline = time.time() + timeout if timeout else None
×
UNCOV
149
        with self._lock:
×
UNCOV
150
            while self._state != self._PAUSING:
×
151
                # If we've been terminated, or the deadline has passed, return.
UNCOV
152
                timeout = deadline - time.time() if deadline else None
×
UNCOV
153
                if self._state == self._TERMINATING or not timeout or timeout <= 0:
×
UNCOV
154
                    return
×
155
                # Otherwise, wait for the state to change.
UNCOV
156
                self._condition.wait(timeout=timeout)
×
157

158
            # Set Paused, and then wait until we are no longer Paused.
UNCOV
159
            self._set_state(self._PAUSED, self._PAUSING)
×
UNCOV
160
            while self._state == self._PAUSED:
×
UNCOV
161
                self._condition.wait()
×
162

163
    def mark_pausing(self):
1✔
164
        """Requests that the service move to the Paused state, without waiting for it to do so.
165

166
        Raises if the service is not currently in the Running state.
167
        """
UNCOV
168
        with self._lock:
×
UNCOV
169
            self._set_state(self._PAUSING, self._RUNNING)
×
170

171
    def mark_running(self):
1✔
172
        """Moves the service to the Running state.
173

174
        Raises if the service is not currently in the Paused state.
175
        """
UNCOV
176
        with self._lock:
×
UNCOV
177
            self._set_state(self._RUNNING, self._PAUSED)
×
178

179
    def mark_terminating(self):
1✔
180
        """Requests that the service move to the Terminating state, without waiting for it to do
181
        so."""
UNCOV
182
        with self._lock:
×
UNCOV
183
            self._set_state(self._TERMINATING)
×
184

185
    @property
1✔
186
    def is_terminating(self):
1✔
187
        """Returns True if the Service should currently be terminating.
188

189
        NB: `Terminating` does not have an associated "terminated" state, because the caller uses
190
        liveness of the service thread to determine when a service is terminated.
191
        """
UNCOV
192
        with self._lock:
×
UNCOV
193
            return self._state == self._TERMINATING
×
194

195

196
@dataclass(frozen=True)
1✔
197
class PantsServices:
1✔
198
    """A collection of running PantsServices threads."""
199

200
    JOIN_TIMEOUT_SECONDS = 1
1✔
201

202
    _service_threads: dict[PantsService, threading.Thread]
203

204
    def __init__(self, services: tuple[PantsService, ...] = ()) -> None:
1✔
UNCOV
205
        object.__setattr__(self, "_service_threads", self._start(services))
×
206

207
    @classmethod
1✔
208
    def _make_thread(cls, service):
1✔
209
        name = f"{service.__class__.__name__}Thread"
×
210
        t = threading.Thread(target=service.run, name=name)
×
211
        t.daemon = True
×
212
        return t
×
213

214
    @classmethod
1✔
215
    def _start(cls, services: tuple[PantsService, ...]) -> dict[PantsService, threading.Thread]:
1✔
216
        """Launch a thread per service."""
217

UNCOV
218
        for service in services:
×
219
            logger.debug(f"setting up service {service}")
×
220
            service.setup(services)
×
221

UNCOV
222
        service_thread_map = {service: cls._make_thread(service) for service in services}
×
223

UNCOV
224
        for service, service_thread in service_thread_map.items():
×
225
            logger.debug(f"starting service {service}")
×
226
            service_thread.start()
×
227

UNCOV
228
        return service_thread_map
×
229

230
    @property
1✔
231
    def services(self) -> KeysView[PantsService]:
1✔
232
        return self._service_threads.keys()
×
233

234
    def are_all_alive(self) -> bool:
1✔
235
        """Return true if all services threads are still alive, and false if any have died.
236

237
        This method does not have side effects: if one service thread has died, the rest should be
238
        killed and joined via `self.shutdown()`.
239
        """
240
        for service, service_thread in self._service_threads.items():
×
241
            if not service_thread.is_alive():
×
242
                logger.error(f"service failure for {service}.")
×
243
                return False
×
244
        return True
×
245

246
    def shutdown(self) -> None:
1✔
247
        """Shut down and join all service threads."""
UNCOV
248
        for service, service_thread in self._service_threads.items():
×
249
            service.terminate()
×
UNCOV
250
        for service, service_thread in self._service_threads.items():
×
251
            logger.debug(f"terminating pantsd service: {service}")
×
252
            service_thread.join(self.JOIN_TIMEOUT_SECONDS)
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc