• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pantsbuild / pants / 18517631058

15 Oct 2025 04:18AM UTC coverage: 69.207% (-11.1%) from 80.267%
18517631058

Pull #22745

github

web-flow
Merge 642a76ca1 into 99919310e
Pull Request #22745: [windows] Add windows support in the stdio crate.

53815 of 77759 relevant lines covered (69.21%)

2.42 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

66.07
/src/python/pants/pantsd/service/pants_service.py
1
# Copyright 2015 Pants project contributors (see CONTRIBUTORS.md).
2
# Licensed under the Apache License, Version 2.0 (see LICENSE).
3

4
import logging
7✔
5
import threading
7✔
6
import time
7✔
7
from abc import ABC, abstractmethod
7✔
8
from collections.abc import KeysView
7✔
9
from dataclasses import dataclass
7✔
10

11
logger = logging.getLogger(__name__)
7✔
12

13

14
class PantsService(ABC):
7✔
15
    """Pants daemon service base class.
16

17
    The service lifecycle is made up of states described in the _ServiceState class, and controlled
18
    by a calling thread that is holding the Service `lifecycle_lock`. Under that lock, a caller
19
    can signal a service to "pause", "run", or "terminate" (see _ServiceState for more details).
20

21
    pantsd pauses all Services before forking a pantsd in order to ensure that no "relevant"
22
    locks are held (or awaited: see #6565) by threads that might not survive the fork. While paused,
23
    a Service must not have any threads running that might interact with any non-private locks.
24

25
    After forking, the pantsd (child) process should call `terminate()` to finish shutting down
26
    the service, and the parent process should call `resume()` to cause the service to resume running.
27
    """
28

29
    class ServiceError(Exception):
7✔
30
        pass
7✔
31

32
    def __init__(self):
7✔
33
        super().__init__()
1✔
34
        self.name = self.__class__.__name__
1✔
35
        self._state = _ServiceState()
1✔
36

37
    def setup(self, services: tuple["PantsService", ...]):
7✔
38
        """Called before `run` to allow for service->service or other side-effecting setup."""
39
        self.services = services
1✔
40

41
    @abstractmethod
7✔
42
    def run(self):
7✔
43
        """The main entry-point for the service called by the service runner."""
44

45
    def mark_pausing(self):
7✔
46
        """Triggers pausing of the service, without waiting for it to have paused.
47

48
        See the class and _ServiceState pydocs.
49
        """
50
        self._state.mark_pausing()
×
51

52
    def await_paused(self):
7✔
53
        """Once a service has been marked pausing, waits for it to have paused.
54

55
        See the class and _ServiceState pydocs.
56
        """
57
        self._state.await_paused()
×
58

59
    def resume(self):
7✔
60
        """Triggers the service to resume running, without waiting.
61

62
        See the class and _ServiceState pydocs.
63
        """
64
        self._state.mark_running()
×
65

66
    def terminate(self):
7✔
67
        """Triggers termination of the service, without waiting.
68

69
        See the class and _ServiceState pydocs.
70
        """
71
        self._state.mark_terminating()
1✔
72

73

74
class _ServiceState:
7✔
75
    """A threadsafe state machine for controlling a service running in another thread.
76

77
    The state machine represents two stable states:
78
      Running
79
      Paused
80
    And two transitional states:
81
      Pausing
82
      Terminating
83

84
    The methods of this class allow a caller to ask the Service to transition states, and then wait
85
    for those transitions to occur.
86

87
    A simplifying assumption is that there is one service thread that interacts with the state, and
88
    only one controlling thread. In the case of `pantsd`, the "one calling thread" condition is
89
    protected by the service `lifecycle_lock`.
90

91
    A complicating assumption is that while a service thread is `Paused`, it must be in a position
92
    where it could safely disappear and never come back. This is accounted for by having the service
93
    thread wait on a Condition variable while Paused: testing indicates that for multiple Pythons
94
    on both OSX and Linux, this does not result in poisoning of the associated Lock.
95
    """
96

97
    _RUNNING = "Running"
7✔
98
    _PAUSED = "Paused"
7✔
99
    _PAUSING = "Pausing"
7✔
100
    _TERMINATING = "Terminating"
7✔
101

102
    def __init__(self):
7✔
103
        """Creates a ServiceState in the Running state."""
104
        self._state = self._RUNNING
1✔
105
        self._lock = threading.Lock()
1✔
106
        self._condition = threading.Condition(self._lock)
1✔
107

108
    def _set_state(self, state, *valid_states):
7✔
109
        if valid_states and self._state not in valid_states:
1✔
110
            raise AssertionError(f"Cannot move {self} to `{state}` while it is `{self._state}`.")
×
111
        self._state = state
1✔
112
        self._condition.notify_all()
1✔
113

114
    def await_paused(self, timeout=None):
7✔
115
        """Blocks until the service is in the Paused state, then returns True.
116

117
        If a timeout is specified, the method may return False to indicate a timeout: with no timeout
118
        it will always (eventually) return True.
119

120
        Raises if the service is not currently in the Pausing state.
121
        """
122
        deadline = time.time() + timeout if timeout else None
×
123
        with self._lock:
×
124
            # Wait until the service transitions out of Pausing.
125
            while self._state != self._PAUSED:
×
126
                if self._state != self._PAUSING:
×
127
                    raise AssertionError(
×
128
                        "Cannot wait for {} to reach `{}` while it is in `{}`.".format(
129
                            self, self._PAUSED, self._state
130
                        )
131
                    )
132
                timeout = deadline - time.time() if deadline else None
×
133
                if timeout and timeout <= 0:
×
134
                    return False
×
135
                self._condition.wait(timeout=timeout)
×
136
            return True
×
137

138
    def maybe_pause(self, timeout=None):
7✔
139
        """Called by the service to indicate that it is pausable.
140

141
        If the service calls this method while the state is `Pausing`, the state will transition
142
        to `Paused`, and the service will block here until it is marked `Running` or `Terminating`.
143

144
        If the state is not currently `Pausing`, and a timeout is not passed, this method returns
145
        immediately. If a timeout is passed, this method blocks up to that number of seconds to wait
146
        to transition to `Pausing`.
147
        """
148
        deadline = time.time() + timeout if timeout else None
1✔
149
        with self._lock:
1✔
150
            while self._state != self._PAUSING:
1✔
151
                # If we've been terminated, or the deadline has passed, return.
152
                timeout = deadline - time.time() if deadline else None
1✔
153
                if self._state == self._TERMINATING or not timeout or timeout <= 0:
1✔
154
                    return
1✔
155
                # Otherwise, wait for the state to change.
156
                self._condition.wait(timeout=timeout)
1✔
157

158
            # Set Paused, and then wait until we are no longer Paused.
159
            self._set_state(self._PAUSED, self._PAUSING)
×
160
            while self._state == self._PAUSED:
×
161
                self._condition.wait()
×
162

163
    def mark_pausing(self):
7✔
164
        """Requests that the service move to the Paused state, without waiting for it to do so.
165

166
        Raises if the service is not currently in the Running state.
167
        """
168
        with self._lock:
×
169
            self._set_state(self._PAUSING, self._RUNNING)
×
170

171
    def mark_running(self):
7✔
172
        """Moves the service to the Running state.
173

174
        Raises if the service is not currently in the Paused state.
175
        """
176
        with self._lock:
×
177
            self._set_state(self._RUNNING, self._PAUSED)
×
178

179
    def mark_terminating(self):
7✔
180
        """Requests that the service move to the Terminating state, without waiting for it to do
181
        so."""
182
        with self._lock:
1✔
183
            self._set_state(self._TERMINATING)
1✔
184

185
    @property
7✔
186
    def is_terminating(self):
7✔
187
        """Returns True if the Service should currently be terminating.
188

189
        NB: `Terminating` does not have an associated "terminated" state, because the caller uses
190
        liveness of the service thread to determine when a service is terminated.
191
        """
192
        with self._lock:
1✔
193
            return self._state == self._TERMINATING
1✔
194

195

196
@dataclass(frozen=True)
7✔
197
class PantsServices:
7✔
198
    """A collection of running PantsServices threads."""
199

200
    JOIN_TIMEOUT_SECONDS = 1
7✔
201

202
    _service_threads: dict[PantsService, threading.Thread]
7✔
203

204
    def __init__(self, services: tuple[PantsService, ...] = ()) -> None:
7✔
205
        object.__setattr__(self, "_service_threads", self._start(services))
1✔
206

207
    @classmethod
7✔
208
    def _make_thread(cls, service):
7✔
209
        name = f"{service.__class__.__name__}Thread"
×
210
        t = threading.Thread(target=service.run, name=name)
×
211
        t.daemon = True
×
212
        return t
×
213

214
    @classmethod
7✔
215
    def _start(cls, services: tuple[PantsService, ...]) -> dict[PantsService, threading.Thread]:
7✔
216
        """Launch a thread per service."""
217

218
        for service in services:
1✔
219
            logger.debug(f"setting up service {service}")
×
220
            service.setup(services)
×
221

222
        service_thread_map = {service: cls._make_thread(service) for service in services}
1✔
223

224
        for service, service_thread in service_thread_map.items():
1✔
225
            logger.debug(f"starting service {service}")
×
226
            service_thread.start()
×
227

228
        return service_thread_map
1✔
229

230
    @property
7✔
231
    def services(self) -> KeysView[PantsService]:
7✔
232
        return self._service_threads.keys()
×
233

234
    def are_all_alive(self) -> bool:
7✔
235
        """Return true if all services threads are still alive, and false if any have died.
236

237
        This method does not have side effects: if one service thread has died, the rest should be
238
        killed and joined via `self.shutdown()`.
239
        """
240
        for service, service_thread in self._service_threads.items():
×
241
            if not service_thread.is_alive():
×
242
                logger.error(f"service failure for {service}.")
×
243
                return False
×
244
        return True
×
245

246
    def shutdown(self) -> None:
7✔
247
        """Shut down and join all service threads."""
248
        for service, service_thread in self._service_threads.items():
1✔
249
            service.terminate()
×
250
        for service, service_thread in self._service_threads.items():
1✔
251
            logger.debug(f"terminating pantsd service: {service}")
×
252
            service_thread.join(self.JOIN_TIMEOUT_SECONDS)
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc