• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pantsbuild / pants / 19015773527

02 Nov 2025 05:33PM UTC coverage: 17.872% (-62.4%) from 80.3%
19015773527

Pull #22816

github

web-flow
Merge a12d75757 into 6c024e162
Pull Request #22816: Update Pants internal Python to 3.14

4 of 5 new or added lines in 3 files covered. (80.0%)

28452 existing lines in 683 files now uncovered.

9831 of 55007 relevant lines covered (17.87%)

0.18 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/src/python/pants/bsp/protocol.py
1
# Copyright 2022 Pants project contributors (see CONTRIBUTORS.md).
2
# Licensed under the Apache License, Version 2.0 (see LICENSE).
UNCOV
3
from __future__ import annotations
×
4

UNCOV
5
import logging
×
UNCOV
6
from concurrent.futures import Future
×
UNCOV
7
from typing import Any, BinaryIO, ClassVar, Protocol
×
8

UNCOV
9
from pylsp_jsonrpc.endpoint import Endpoint  # type: ignore[import-untyped]
×
UNCOV
10
from pylsp_jsonrpc.exceptions import (  # type: ignore[import-untyped]
×
11
    JsonRpcException,
12
    JsonRpcInvalidRequest,
13
    JsonRpcMethodNotFound,
14
)
UNCOV
15
from pylsp_jsonrpc.streams import (  # type: ignore[import-untyped]
×
16
    JsonRpcStreamReader,
17
    JsonRpcStreamWriter,
18
)
19

UNCOV
20
from pants.bsp.context import BSPContext
×
UNCOV
21
from pants.bsp.spec.notification import BSPNotification
×
UNCOV
22
from pants.core.environments.rules import determine_bootstrap_environment
×
UNCOV
23
from pants.engine.environment import EnvironmentName
×
UNCOV
24
from pants.engine.fs import Workspace
×
UNCOV
25
from pants.engine.internals.scheduler import SchedulerSession
×
UNCOV
26
from pants.engine.internals.selectors import Params
×
UNCOV
27
from pants.engine.unions import UnionMembership, union
×
28

UNCOV
29
_logger = logging.getLogger(__name__)
×
30

31

UNCOV
32
class BSPRequestTypeProtocol(Protocol):
×
33
    @classmethod
34
    def from_json_dict(cls, d: dict[str, Any]) -> Any: ...
35

36

UNCOV
37
class BSPResponseTypeProtocol(Protocol):
×
38
    def to_json_dict(self) -> dict[str, Any]: ...
39

40

UNCOV
41
@union(in_scope_types=[EnvironmentName])
×
UNCOV
42
class BSPHandlerMapping:
×
43
    """Union type for rules to register handlers for BSP methods."""
44

45
    # Name of the JSON-RPC method to be handled.
UNCOV
46
    method_name: ClassVar[str]
×
47

48
    # Type requested from the engine. This will be provided as the "subject" of an engine query.
49
    # Must implement class method `from_json_dict`.
UNCOV
50
    request_type: type[BSPRequestTypeProtocol]
×
51

52
    # Type produced by the handler rule. This will be requested as the "product" of the engine query.
53
    # Must implement instance method `to_json_dict`.
UNCOV
54
    response_type: type[BSPResponseTypeProtocol]
×
55

56
    # True if this handler is for a notification.
57
    # TODO: Consider how to pass notifications (which do not have responses) to the engine rules.
UNCOV
58
    is_notification: bool = False
×
59

60

UNCOV
61
def _make_error_future(exc: Exception) -> Future:
×
UNCOV
62
    fut: Future = Future()
×
UNCOV
63
    fut.set_exception(exc)
×
UNCOV
64
    return fut
×
65

66

UNCOV
67
class BSPConnection:
×
UNCOV
68
    _INITIALIZE_METHOD_NAME = "build/initialize"
×
UNCOV
69
    _SHUTDOWN_METHOD_NAME = "build/shutdown"
×
UNCOV
70
    _EXIT_NOTIFICATION_NAME = "build/exit"
×
71

UNCOV
72
    def __init__(
×
73
        self,
74
        scheduler_session: SchedulerSession,
75
        union_membership: UnionMembership,
76
        context: BSPContext,
77
        inbound: BinaryIO,
78
        outbound: BinaryIO,
79
        max_workers: int = 5,
80
    ) -> None:
UNCOV
81
        self._scheduler_session = scheduler_session
×
82
        # TODO: We might eventually want to make this configurable.
UNCOV
83
        self._env_name = determine_bootstrap_environment(self._scheduler_session)
×
UNCOV
84
        self._inbound = JsonRpcStreamReader(inbound)
×
UNCOV
85
        self._outbound = JsonRpcStreamWriter(outbound)
×
UNCOV
86
        self._context: BSPContext = context
×
UNCOV
87
        self._endpoint = Endpoint(self, self._send_outbound_message, max_workers=max_workers)
×
88

UNCOV
89
        self._handler_mappings: dict[str, type[BSPHandlerMapping]] = {}
×
UNCOV
90
        impls = union_membership.get(BSPHandlerMapping)
×
UNCOV
91
        for impl in impls:
×
UNCOV
92
            self._handler_mappings[impl.method_name] = impl
×
93

UNCOV
94
    def run(self) -> None:
×
95
        """Run the listener for inbound JSON-RPC messages."""
UNCOV
96
        self._inbound.listen(self._received_inbound_message)
×
97

UNCOV
98
    def _received_inbound_message(self, msg):
×
99
        """Process each inbound JSON-RPC message."""
UNCOV
100
        _logger.info(f"_received_inbound_message: msg={msg}")
×
UNCOV
101
        self._endpoint.consume(msg)
×
102

UNCOV
103
    def _send_outbound_message(self, msg):
×
UNCOV
104
        _logger.info(f"_send_outbound_message: msg={msg}")
×
UNCOV
105
        self._outbound.write(msg)
×
106

107
    # TODO: Figure out how to run this on the `Endpoint`'s thread pool by returning a callable. For now, we
108
    # need to return errors as futures given that `Endpoint` only handles exceptions returned that way versus using a try ... except block.
UNCOV
109
    def _handle_inbound_message(self, *, method_name: str, params: Any):
×
110
        # If the connection is not yet initialized and this is not the initialization request, BSP requires
111
        # returning an error for methods (and to discard all notifications).
112
        #
113
        # Concurrency: This method can be invoked from multiple threads (for each individual request). By returning
114
        # an error for all other requests, only the thread running the initialization RPC should be able to proceed.
115
        # This ensures that we can safely call `initialize_connection` on the BSPContext with the client-supplied
116
        # init parameters without worrying about multiple threads. (Not entirely true though as this does not handle
117
        # the client making multiple concurrent initialization RPCs, but which would violate the protocol in any case.)
UNCOV
118
        if (
×
119
            not self._context.is_connection_initialized
120
            and method_name != self._INITIALIZE_METHOD_NAME
121
        ):
UNCOV
122
            return _make_error_future(
×
123
                JsonRpcException(
124
                    code=-32002, message=f"Client must first call `{self._INITIALIZE_METHOD_NAME}`."
125
                )
126
            )
127

128
        # Handle the `build/shutdown` method and `build/exit` notification.
UNCOV
129
        if method_name == self._SHUTDOWN_METHOD_NAME:
×
130
            # Return no-op success for the `build/shutdown` method. This doesn't actually cause the server to
131
            # exit. That will occur once the client sends the `build/exit` notification.
132
            return None
×
UNCOV
133
        elif method_name == self._EXIT_NOTIFICATION_NAME:
×
134
            # The `build/exit` notification directs the BSP server to immediately exit.
135
            # The read-dispatch loop will exit once it notices that the inbound handle is closed. So close the
136
            # inbound handle (and outbound handle for completeness) and then return to the dispatch loop
137
            # to trigger the exit.
138
            self._inbound.close()
×
139
            self._outbound.close()
×
140
            return None
×
141

UNCOV
142
        method_mapping = self._handler_mappings.get(method_name)
×
UNCOV
143
        if not method_mapping:
×
UNCOV
144
            return _make_error_future(JsonRpcMethodNotFound.of(method_name))
×
145

UNCOV
146
        try:
×
UNCOV
147
            request = method_mapping.request_type.from_json_dict(params)
×
148
        except Exception:
×
149
            return _make_error_future(JsonRpcInvalidRequest())
×
150

151
        # TODO: This should not be necessary: see https://github.com/pantsbuild/pants/issues/15435.
UNCOV
152
        self._scheduler_session.new_run_id()
×
153

UNCOV
154
        workspace = Workspace(self._scheduler_session)
×
UNCOV
155
        params = Params(request, workspace, self._env_name)
×
UNCOV
156
        execution_request = self._scheduler_session.execution_request(
×
157
            requests=[(method_mapping.response_type, params)],
158
        )
UNCOV
159
        (result,) = self._scheduler_session.execute(execution_request)
×
160
        # Initialize the BSPContext with the client-supplied init parameters. See earlier comment on why this
161
        # call to `BSPContext.initialize_connection` is safe.
UNCOV
162
        if method_name == self._INITIALIZE_METHOD_NAME:
×
UNCOV
163
            self._context.initialize_connection(request, self.notify_client)
×
UNCOV
164
        return result.to_json_dict()
×
165

166
    # Called by `Endpoint` to dispatch requests and notifications.
167
    # TODO: Should probably vendor `Endpoint` so we can detect notifications versus method calls, which
168
    # matters when ignoring unknown notifications versus erroring for unknown methods.
UNCOV
169
    def __getitem__(self, method_name):
×
UNCOV
170
        def handler(params):
×
UNCOV
171
            return self._handle_inbound_message(method_name=method_name, params=params)
×
172

UNCOV
173
        return handler
×
174

UNCOV
175
    def notify_client(self, notification: BSPNotification) -> None:
×
176
        try:
×
177
            self._endpoint.notify(notification.notification_name, notification.to_json_dict())
×
178
        except Exception as ex:
×
179
            _logger.warning(f"Received exception while notifying BSP client: {ex}")
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc