• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pantsbuild / pants / 22740642519

05 Mar 2026 11:00PM UTC coverage: 52.677% (-40.3%) from 92.931%
22740642519

Pull #23157

github

web-flow
Merge 2aa18e6d4 into f0030f5e7
Pull Request #23157: [pants ng] Partition source files by config.

31678 of 60136 relevant lines covered (52.68%)

0.53 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

40.71
/src/python/pants/pantsd/service/pants_service.py
1
# Copyright 2015 Pants project contributors (see CONTRIBUTORS.md).
2
# Licensed under the Apache License, Version 2.0 (see LICENSE).
3
from __future__ import annotations
1✔
4

5
import logging
1✔
6
import threading
1✔
7
import time
1✔
8
from abc import ABC, abstractmethod
1✔
9
from collections.abc import KeysView
1✔
10
from dataclasses import dataclass
1✔
11

12
logger = logging.getLogger(__name__)
1✔
13

14

15
class PantsService(ABC):
1✔
16
    """Pants daemon service base class.
17

18
    The service lifecycle is made up of states described in the _ServiceState class, and controlled
19
    by a calling thread that is holding the Service `lifecycle_lock`. Under that lock, a caller
20
    can signal a service to "pause", "run", or "terminate" (see _ServiceState for more details).
21

22
    pantsd pauses all Services before forking a pantsd in order to ensure that no "relevant"
23
    locks are held (or awaited: see #6565) by threads that might not survive the fork. While paused,
24
    a Service must not have any threads running that might interact with any non-private locks.
25

26
    After forking, the pantsd (child) process should call `terminate()` to finish shutting down
27
    the service, and the parent process should call `resume()` to cause the service to resume running.
28
    """
29

30
    class ServiceError(Exception):
1✔
31
        pass
1✔
32

33
    def __init__(self):
1✔
34
        super().__init__()
×
35
        self.name = self.__class__.__name__
×
36
        self._state = _ServiceState()
×
37

38
    def setup(self, services: tuple[PantsService, ...]):
1✔
39
        """Called before `run` to allow for service->service or other side-effecting setup."""
40
        self.services = services
×
41

42
    @abstractmethod
1✔
43
    def run(self):
1✔
44
        """The main entry-point for the service called by the service runner."""
45

46
    def mark_pausing(self):
1✔
47
        """Triggers pausing of the service, without waiting for it to have paused.
48

49
        See the class and _ServiceState pydocs.
50
        """
51
        self._state.mark_pausing()
×
52

53
    def await_paused(self):
1✔
54
        """Once a service has been marked pausing, waits for it to have paused.
55

56
        See the class and _ServiceState pydocs.
57
        """
58
        self._state.await_paused()
×
59

60
    def resume(self):
1✔
61
        """Triggers the service to resume running, without waiting.
62

63
        See the class and _ServiceState pydocs.
64
        """
65
        self._state.mark_running()
×
66

67
    def terminate(self):
1✔
68
        """Triggers termination of the service, without waiting.
69

70
        See the class and _ServiceState pydocs.
71
        """
72
        self._state.mark_terminating()
×
73

74

75
class _ServiceState:
1✔
76
    """A threadsafe state machine for controlling a service running in another thread.
77

78
    The state machine represents two stable states:
79
      Running
80
      Paused
81
    And two transitional states:
82
      Pausing
83
      Terminating
84

85
    The methods of this class allow a caller to ask the Service to transition states, and then wait
86
    for those transitions to occur.
87

88
    A simplifying assumption is that there is one service thread that interacts with the state, and
89
    only one controlling thread. In the case of `pantsd`, the "one calling thread" condition is
90
    protected by the service `lifecycle_lock`.
91

92
    A complicating assumption is that while a service thread is `Paused`, it must be in a position
93
    where it could safely disappear and never come back. This is accounted for by having the service
94
    thread wait on a Condition variable while Paused: testing indicates that for multiple Pythons
95
    on both OSX and Linux, this does not result in poisoning of the associated Lock.
96
    """
97

98
    _RUNNING = "Running"
1✔
99
    _PAUSED = "Paused"
1✔
100
    _PAUSING = "Pausing"
1✔
101
    _TERMINATING = "Terminating"
1✔
102

103
    def __init__(self):
1✔
104
        """Creates a ServiceState in the Running state."""
105
        self._state = self._RUNNING
×
106
        self._lock = threading.Lock()
×
107
        self._condition = threading.Condition(self._lock)
×
108

109
    def _set_state(self, state, *valid_states):
1✔
110
        if valid_states and self._state not in valid_states:
×
111
            raise AssertionError(f"Cannot move {self} to `{state}` while it is `{self._state}`.")
×
112
        self._state = state
×
113
        self._condition.notify_all()
×
114

115
    def await_paused(self, timeout=None):
1✔
116
        """Blocks until the service is in the Paused state, then returns True.
117

118
        If a timeout is specified, the method may return False to indicate a timeout: with no timeout
119
        it will always (eventually) return True.
120

121
        Raises if the service is not currently in the Pausing state.
122
        """
123
        deadline = time.time() + timeout if timeout else None
×
124
        with self._lock:
×
125
            # Wait until the service transitions out of Pausing.
126
            while self._state != self._PAUSED:
×
127
                if self._state != self._PAUSING:
×
128
                    raise AssertionError(
×
129
                        f"Cannot wait for {self} to reach `{self._PAUSED}` while it is in `{self._state}`."
130
                    )
131
                timeout = deadline - time.time() if deadline else None
×
132
                if timeout and timeout <= 0:
×
133
                    return False
×
134
                self._condition.wait(timeout=timeout)
×
135
            return True
×
136

137
    def maybe_pause(self, timeout=None):
1✔
138
        """Called by the service to indicate that it is pausable.
139

140
        If the service calls this method while the state is `Pausing`, the state will transition
141
        to `Paused`, and the service will block here until it is marked `Running` or `Terminating`.
142

143
        If the state is not currently `Pausing`, and a timeout is not passed, this method returns
144
        immediately. If a timeout is passed, this method blocks up to that number of seconds to wait
145
        to transition to `Pausing`.
146
        """
147
        deadline = time.time() + timeout if timeout else None
×
148
        with self._lock:
×
149
            while self._state != self._PAUSING:
×
150
                # If we've been terminated, or the deadline has passed, return.
151
                timeout = deadline - time.time() if deadline else None
×
152
                if self._state == self._TERMINATING or not timeout or timeout <= 0:
×
153
                    return
×
154
                # Otherwise, wait for the state to change.
155
                self._condition.wait(timeout=timeout)
×
156

157
            # Set Paused, and then wait until we are no longer Paused.
158
            self._set_state(self._PAUSED, self._PAUSING)
×
159
            while self._state == self._PAUSED:
×
160
                self._condition.wait()
×
161

162
    def mark_pausing(self):
1✔
163
        """Requests that the service move to the Paused state, without waiting for it to do so.
164

165
        Raises if the service is not currently in the Running state.
166
        """
167
        with self._lock:
×
168
            self._set_state(self._PAUSING, self._RUNNING)
×
169

170
    def mark_running(self):
1✔
171
        """Moves the service to the Running state.
172

173
        Raises if the service is not currently in the Paused state.
174
        """
175
        with self._lock:
×
176
            self._set_state(self._RUNNING, self._PAUSED)
×
177

178
    def mark_terminating(self):
1✔
179
        """Requests that the service move to the Terminating state, without waiting for it to do
180
        so."""
181
        with self._lock:
×
182
            self._set_state(self._TERMINATING)
×
183

184
    @property
1✔
185
    def is_terminating(self):
1✔
186
        """Returns True if the Service should currently be terminating.
187

188
        NB: `Terminating` does not have an associated "terminated" state, because the caller uses
189
        liveness of the service thread to determine when a service is terminated.
190
        """
191
        with self._lock:
×
192
            return self._state == self._TERMINATING
×
193

194

195
@dataclass(frozen=True)
1✔
196
class PantsServices:
1✔
197
    """A collection of running PantsServices threads."""
198

199
    JOIN_TIMEOUT_SECONDS = 1
1✔
200

201
    _service_threads: dict[PantsService, threading.Thread]
1✔
202

203
    def __init__(self, services: tuple[PantsService, ...] = ()) -> None:
1✔
204
        object.__setattr__(self, "_service_threads", self._start(services))
×
205

206
    @classmethod
1✔
207
    def _make_thread(cls, service):
1✔
208
        name = f"{service.__class__.__name__}Thread"
×
209
        t = threading.Thread(target=service.run, name=name)
×
210
        t.daemon = True
×
211
        return t
×
212

213
    @classmethod
1✔
214
    def _start(cls, services: tuple[PantsService, ...]) -> dict[PantsService, threading.Thread]:
1✔
215
        """Launch a thread per service."""
216

217
        for service in services:
×
218
            logger.debug(f"setting up service {service}")
×
219
            service.setup(services)
×
220

221
        service_thread_map = {service: cls._make_thread(service) for service in services}
×
222

223
        for service, service_thread in service_thread_map.items():
×
224
            logger.debug(f"starting service {service}")
×
225
            service_thread.start()
×
226

227
        return service_thread_map
×
228

229
    @property
1✔
230
    def services(self) -> KeysView[PantsService]:
1✔
231
        return self._service_threads.keys()
×
232

233
    def are_all_alive(self) -> bool:
1✔
234
        """Return true if all services threads are still alive, and false if any have died.
235

236
        This method does not have side effects: if one service thread has died, the rest should be
237
        killed and joined via `self.shutdown()`.
238
        """
239
        for service, service_thread in self._service_threads.items():
×
240
            if not service_thread.is_alive():
×
241
                logger.error(f"service failure for {service}.")
×
242
                return False
×
243
        return True
×
244

245
    def shutdown(self) -> None:
1✔
246
        """Shut down and join all service threads."""
247
        for service, service_thread in self._service_threads.items():
×
248
            service.terminate()
×
249
        for service, service_thread in self._service_threads.items():
×
250
            logger.debug(f"terminating pantsd service: {service}")
×
251
            service_thread.join(self.JOIN_TIMEOUT_SECONDS)
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc