• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

deepset-ai / haystack / 15049844454

15 May 2025 04:07PM UTC coverage: 90.446% (+0.04%) from 90.41%
15049844454

Pull #9345

github

web-flow
Merge 9e4071f83 into 2a64cd4e9
Pull Request #9345: feat: add serialization to `State` / move `State` to utils

10981 of 12141 relevant lines covered (90.45%)

0.9 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

62.33
haystack/components/connectors/openapi_service.py
1
# SPDX-FileCopyrightText: 2022-present deepset GmbH <info@deepset.ai>
2
#
3
# SPDX-License-Identifier: Apache-2.0
4

5
import json
1✔
6
from collections import defaultdict
1✔
7
from copy import copy
1✔
8
from typing import Any, Dict, List, Optional, Union
1✔
9

10
from haystack import component, default_from_dict, default_to_dict
1✔
11
from haystack.dataclasses import ChatMessage, ChatRole
1✔
12
from haystack.lazy_imports import LazyImport
1✔
13

14
with LazyImport("Run 'pip install openapi3'") as openapi_imports:
1✔
15
    import requests
1✔
16
    from openapi3 import OpenAPI
1✔
17
    from openapi3.errors import UnexpectedResponseError
1✔
18
    from openapi3.paths import Operation
1✔
19

20
    # Patch the request method to add support for the proper raw_response handling
21
    # If you see that https://github.com/Dorthu/openapi3/pull/124/
22
    # is merged, we can remove this patch - notify authors of this code
23
    def patch_request(
1✔
24
        self,
25
        base_url: str,
26
        *,
27
        data: Optional[Any] = None,
28
        parameters: Optional[Dict[str, Any]] = None,
29
        raw_response: bool = False,
30
        security: Optional[Dict[str, str]] = None,
31
        session: Optional[Any] = None,
32
        verify: Union[bool, str] = True,
33
    ) -> Optional[Any]:
34
        """
35
        Sends an HTTP request as described by this path.
36

37
        :param base_url: The URL to append this operation's path to when making
38
                         the call.
39
        :param data: The request body to send.
40
        :param parameters: The parameters used to create the path.
41
        :param raw_response: If true, return the raw response instead of validating
42
                             and exterpolating it.
43
        :param security: The security scheme to use, and the values it needs to
44
                         process successfully.
45
        :param session: A persistent request session.
46
        :param verify: If we should do an ssl verification on the request or not.
47
                       In case str was provided, will use that as the CA.
48
        :return: The response data, either raw or processed depending on raw_response flag.
49
        """
50
        # Set request method (e.g. 'GET')
51
        self._request = requests.Request(self.path[-1])
×
52

53
        # Set self._request.url to base_url w/ path
54
        self._request.url = base_url + self.path[-2]
×
55

56
        parameters = parameters or {}
×
57
        security = security or {}
×
58

59
        if security and self.security:
×
60
            security_requirement = None
×
61
            for scheme, value in security.items():
×
62
                security_requirement = None
×
63
                for r in self.security:
×
64
                    if r.name == scheme:
×
65
                        security_requirement = r
×
66
                        self._request_handle_secschemes(r, value)
×
67

68
            if security_requirement is None:
×
69
                err_msg = """No security requirement satisfied (accepts {}) \
×
70
                          """.format(", ".join(self.security.keys()))
71
                raise ValueError(err_msg)
×
72

73
        if self.requestBody:
×
74
            if self.requestBody.required and data is None:
×
75
                err_msg = "Request Body is required but none was provided."
×
76
                raise ValueError(err_msg)
×
77

78
            self._request_handle_body(data)
×
79

80
        self._request_handle_parameters(parameters)
×
81

82
        if session is None:
×
83
            session = self._session
×
84

85
        # send the prepared request
86
        result = session.send(self._request.prepare(), verify=verify)
×
87

88
        # spec enforces these are strings
89
        status_code = str(result.status_code)
×
90

91
        # find the response model in spec we received
92
        expected_response = None
×
93
        if status_code in self.responses:
×
94
            expected_response = self.responses[status_code]
×
95
        elif "default" in self.responses:
×
96
            expected_response = self.responses["default"]
×
97

98
        if expected_response is None:
×
99
            raise UnexpectedResponseError(result, self)
×
100

101
        # if we got back a valid response code (or there was a default) and no
102
        # response content was expected, return None
103
        if expected_response.content is None:
×
104
            return None
×
105

106
        content_type = result.headers["Content-Type"]
×
107
        if ";" in content_type:
×
108
            # if the content type that came in included an encoding, we'll ignore
109
            # it for now (requests has already parsed it for us) and only look at
110
            # the MIME type when determining if an expected content type was returned.
111
            content_type = content_type.split(";")[0].strip()
×
112

113
        expected_media = expected_response.content.get(content_type, None)
×
114

115
        # If raw_response is True, return the raw text or json based on content type
116
        if raw_response:
×
117
            if "application/json" in content_type:
×
118
                return result.json()
×
119
            return result.text
×
120

121
        if expected_media is None and "/" in content_type:
×
122
            # accept media type ranges in the spec. the most specific matching
123
            # type should always be chosen, but if we do not have a match here
124
            # a generic range should be accepted if one if provided
125
            # https://github.com/OAI/OpenAPI-Specification/blob/master/versions/3.0.1.md#response-object
126

127
            generic_type = content_type.split("/")[0] + "/*"
×
128
            expected_media = expected_response.content.get(generic_type, None)
×
129

130
        if expected_media is None:
×
131
            err_msg = """Unexpected Content-Type {} returned for operation {} \
×
132
                         (expected one of {})"""
133
            err_var = result.headers["Content-Type"], self.operationId, ",".join(expected_response.content.keys())
×
134

135
            raise RuntimeError(err_msg.format(*err_var))
×
136

137
        if content_type.lower() == "application/json":
×
138
            return expected_media.schema.model(result.json())
×
139

140
        raise NotImplementedError("Only application/json content type is supported")
×
141

142
    # Apply the patch
143
    Operation.request = patch_request
1✔
144

145

146
@component
1✔
147
class OpenAPIServiceConnector:
1✔
148
    """
149
    A component which connects the Haystack framework to OpenAPI services.
150

151
    The `OpenAPIServiceConnector` component connects the Haystack framework to OpenAPI services, enabling it to call
152
    operations as defined in the OpenAPI specification of the service.
153

154
    It integrates with `ChatMessage` dataclass, where the payload in messages is used to determine the method to be
155
    called and the parameters to be passed. The message payload should be an OpenAI JSON formatted function calling
156
    string consisting of the method name and the parameters to be passed to the method. The method name and parameters
157
    are then used to invoke the method on the OpenAPI service. The response from the service is returned as a
158
    `ChatMessage`.
159

160
    Before using this component, users usually resolve service endpoint parameters with a help of
161
    `OpenAPIServiceToFunctions` component.
162

163
    The example below demonstrates how to use the `OpenAPIServiceConnector` to invoke a method on a https://serper.dev/
164
    service specified via OpenAPI specification.
165

166
    Note, however, that `OpenAPIServiceConnector` is usually not meant to be used directly, but rather as part of a
167
    pipeline that includes the `OpenAPIServiceToFunctions` component and an `OpenAIChatGenerator` component using LLM
168
    with the function calling capabilities. In the example below we use the function calling payload directly, but in a
169
    real-world scenario, the function calling payload would usually be generated by the `OpenAIChatGenerator` component.
170

171
    Usage example:
172

173
    ```python
174
    import json
175
    import requests
176

177
    from haystack.components.connectors import OpenAPIServiceConnector
178
    from haystack.dataclasses import ChatMessage
179

180

181
    fc_payload = [{'function': {'arguments': '{"q": "Why was Sam Altman ousted from OpenAI?"}', 'name': 'search'},
182
                   'id': 'call_PmEBYvZ7mGrQP5PUASA5m9wO', 'type': 'function'}]
183

184
    serper_token = <your_serper_dev_token>
185
    serperdev_openapi_spec = json.loads(requests.get("https://bit.ly/serper_dev_spec").text)
186
    service_connector = OpenAPIServiceConnector()
187
    result = service_connector.run(messages=[ChatMessage.from_assistant(json.dumps(fc_payload))],
188
                                   service_openapi_spec=serperdev_openapi_spec, service_credentials=serper_token)
189
    print(result)
190

191
    >> {'service_response': [ChatMessage(_role=<ChatRole.ASSISTANT: 'assistant'>, _content=[TextContent(text=
192
    >> '{"searchParameters": {"q": "Why was Sam Altman ousted from OpenAI?",
193
    >> "type": "search", "engine": "google"}, "answerBox": {"snippet": "Concerns over AI safety and OpenAI\'s role
194
    >> in protecting were at the center of Altman\'s brief ouster from the company."...
195
    ```
196

197
    """
198

199
    def __init__(self, ssl_verify: Optional[Union[bool, str]] = None):
1✔
200
        """
201
        Initializes the OpenAPIServiceConnector instance
202

203
        :param ssl_verify: Decide if to use SSL verification to the requests or not,
204
        in case a string is passed, will be used as the CA.
205
        """
206
        openapi_imports.check()
1✔
207
        self.ssl_verify = ssl_verify
1✔
208

209
    @component.output_types(service_response=Dict[str, Any])
1✔
210
    def run(
1✔
211
        self,
212
        messages: List[ChatMessage],
213
        service_openapi_spec: Dict[str, Any],
214
        service_credentials: Optional[Union[dict, str]] = None,
215
    ) -> Dict[str, List[ChatMessage]]:
216
        """
217
        Processes a list of chat messages to invoke a method on an OpenAPI service.
218

219
        It parses the last message in the list, expecting it to contain tool calls.
220

221
        :param messages: A list of `ChatMessage` objects containing the messages to be processed. The last message
222
        should contain the tool calls.
223
        :param service_openapi_spec: The OpenAPI JSON specification object of the service to be invoked. All the refs
224
        should already be resolved.
225
        :param service_credentials: The credentials to be used for authentication with the service.
226
        Currently, only the http and apiKey OpenAPI security schemes are supported.
227

228
        :return: A dictionary with the following keys:
229
            - `service_response`:  a list of `ChatMessage` objects, each containing the response from the service. The
230
                                   response is in JSON format, and the `content` attribute of the `ChatMessage` contains
231
                                   the JSON string.
232

233
        :raises ValueError: If the last message is not from the assistant or if it does not contain tool calls.
234
        """
235

236
        last_message = messages[-1]
1✔
237
        if not last_message.is_from(ChatRole.ASSISTANT):
1✔
238
            raise ValueError(f"{last_message} is not from the assistant.")
1✔
239

240
        tool_calls = last_message.tool_calls
1✔
241
        if not tool_calls:
1✔
242
            raise ValueError(f"The provided ChatMessage has no tool calls.\nChatMessage: {last_message}")
1✔
243

244
        function_payloads = []
1✔
245
        for tool_call in tool_calls:
1✔
246
            function_payloads.append({"arguments": tool_call.arguments, "name": tool_call.tool_name})
1✔
247

248
        # instantiate the OpenAPI service for the given specification
249
        openapi_service = OpenAPI(service_openapi_spec, ssl_verify=self.ssl_verify)
1✔
250
        self._authenticate_service(openapi_service, service_credentials)
1✔
251

252
        response_messages = []
1✔
253
        for method_invocation_descriptor in function_payloads:
1✔
254
            service_response = self._invoke_method(openapi_service, method_invocation_descriptor)
1✔
255
            # openapi3 parses the JSON service response into a model object, which is not our focus at the moment.
256
            # Instead, we require direct access to the raw JSON data of the response, rather than the model objects
257
            # provided by the openapi3 library. This approach helps us avoid issues related to (de)serialization.
258
            # By accessing the raw JSON response through `service_response._raw_data`, we can serialize this data
259
            # into a string. Finally, we use this string to create a ChatMessage object.
260
            response_messages.append(ChatMessage.from_user(json.dumps(service_response)))
1✔
261

262
        return {"service_response": response_messages}
1✔
263

264
    def to_dict(self) -> Dict[str, Any]:
1✔
265
        """
266
        Serializes the component to a dictionary.
267

268
        :returns:
269
            Dictionary with serialized data.
270
        """
271
        return default_to_dict(self, ssl_verify=self.ssl_verify)
1✔
272

273
    @classmethod
1✔
274
    def from_dict(cls, data: Dict[str, Any]) -> "OpenAPIServiceConnector":
1✔
275
        """
276
        Deserializes the component from a dictionary.
277

278
        :param data:
279
            The dictionary to deserialize from.
280
        :returns:
281
            The deserialized component.
282
        """
283
        return default_from_dict(cls, data)
1✔
284

285
    def _authenticate_service(self, openapi_service: "OpenAPI", credentials: Optional[Union[dict, str]] = None):
1✔
286
        """
287
        Authentication with an OpenAPI service.
288

289
        Authenticates with the OpenAPI service if required, supporting both single (str) and multiple
290
        authentication methods (dict).
291

292
        OpenAPI spec v3 supports the following security schemes:
293
        http – for Basic, Bearer and other HTTP authentications schemes
294
        apiKey – for API keys and cookie authentication
295
        oauth2 – for OAuth 2
296
        openIdConnect – for OpenID Connect Discovery
297

298
        Currently, only the http and apiKey schemes are supported. Multiple security schemes can be defined in the
299
        OpenAPI spec, and the credentials should be provided as a dictionary with keys matching the security scheme
300
        names. If only one security scheme is defined, the credentials can be provided as a simple string.
301

302
        :param openapi_service: The OpenAPI service instance.
303
        :param credentials: Credentials for authentication, which can be either a string (e.g. token) or a dictionary
304
        with keys matching the authentication method names.
305
        :raises ValueError: If authentication fails, is not found, or if appropriate credentials are missing.
306
        """
307
        if openapi_service.raw_element.get("components", {}).get("securitySchemes"):
1✔
308
            service_name = openapi_service.info.title
1✔
309
            if not credentials:
1✔
310
                raise ValueError(f"Service {service_name} requires authentication but no credentials were provided.")
1✔
311

312
            # a dictionary of security schemes defined in the OpenAPI spec
313
            # each key is the name of the security scheme, and the value is the scheme definition
314
            security_schemes = openapi_service.components.securitySchemes.raw_element
1✔
315
            supported_schemes = ["http", "apiKey"]  # todo: add support for oauth2 and openIdConnect
1✔
316

317
            authenticated = False
1✔
318
            for scheme_name, scheme in security_schemes.items():
1✔
319
                if scheme["type"] in supported_schemes:
1✔
320
                    auth_credentials = None
1✔
321
                    if isinstance(credentials, str):
1✔
322
                        auth_credentials = credentials
1✔
323
                    elif isinstance(credentials, dict) and scheme_name in credentials:
1✔
324
                        auth_credentials = credentials[scheme_name]
1✔
325
                    if auth_credentials:
1✔
326
                        openapi_service.authenticate(scheme_name, auth_credentials)
1✔
327
                        authenticated = True
1✔
328
                        break
1✔
329

330
                    raise ValueError(
×
331
                        f"Service {service_name} requires {scheme_name} security scheme but no "
332
                        f"credentials were provided for it. Check the service configuration and credentials."
333
                    )
334
            if not authenticated:
1✔
335
                raise ValueError(
1✔
336
                    f"Service {service_name} requires authentication but no credentials were provided "
337
                    f"for it. Check the service configuration and credentials."
338
                )
339

340
    def _invoke_method(self, openapi_service: "OpenAPI", method_invocation_descriptor: Dict[str, Any]) -> Any:
1✔
341
        """
342
        Invokes the specified method on the OpenAPI service.
343

344
        The method name and arguments are passed in the method_invocation_descriptor.
345

346
        :param openapi_service: The OpenAPI service instance.
347
        :param method_invocation_descriptor: The method name and arguments to be passed to the method. The payload
348
        should contain the method name (key: "name") and the arguments (key: "arguments"). The name is a string, and
349
        the arguments are a dictionary of key-value pairs.
350
        :return: A service JSON response.
351
        :raises RuntimeError: If the method is not found or invocation fails.
352
        """
353
        name = method_invocation_descriptor.get("name")
1✔
354
        invocation_arguments = copy(method_invocation_descriptor.get("arguments", {}))
1✔
355
        if not name or not invocation_arguments:
1✔
356
            raise ValueError(
×
357
                f"Invalid function calling descriptor: {method_invocation_descriptor} . It should contain "
358
                f"a method name and arguments."
359
            )
360

361
        # openapi3 specific method to call the operation, do we have it?
362
        method_to_call = getattr(openapi_service, f"call_{name}", None)
1✔
363
        if not callable(method_to_call):
1✔
364
            raise RuntimeError(f"Operation {name} not found in OpenAPI specification {openapi_service.info.title}")
×
365

366
        # get the operation reference from the method_to_call
367
        operation = method_to_call.operation.__self__
1✔
368
        operation_dict = operation.raw_element
1✔
369

370
        # Pack URL/query parameters under "parameters" key
371
        method_call_params: Dict[str, Dict[str, Any]] = defaultdict(dict)
1✔
372
        parameters = operation_dict.get("parameters", [])
1✔
373
        request_body = operation_dict.get("requestBody", {})
1✔
374

375
        for param in parameters:
1✔
376
            param_name = param["name"]
1✔
377
            param_value = invocation_arguments.get(param_name)
1✔
378
            if param_value:
1✔
379
                method_call_params["parameters"][param_name] = param_value
1✔
380
            else:
381
                if param.get("required", False):
1✔
382
                    raise ValueError(f"Missing parameter: '{param_name}' required for the '{name}' operation.")
1✔
383

384
        # Pack request body parameters under "data" key
385
        if request_body:
1✔
386
            schema = request_body.get("content", {}).get("application/json", {}).get("schema", {})
1✔
387
            required_params = schema.get("required", [])
1✔
388
            for param_name in schema.get("properties", {}):
1✔
389
                param_value = invocation_arguments.get(param_name)
1✔
390
                if param_value:
1✔
391
                    method_call_params["data"][param_name] = param_value
1✔
392
                else:
393
                    if param_name in required_params:
1✔
394
                        raise ValueError(
1✔
395
                            f"Missing requestBody parameter: '{param_name}' required for the '{name}' operation."
396
                        )
397
        # call the underlying service REST API with the parameters
398
        return method_to_call(**method_call_params, raw_response=True)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc