• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

deepset-ai / haystack / 13972131258

20 Mar 2025 02:43PM UTC coverage: 90.021% (-0.03%) from 90.054%
13972131258

Pull #9069

github

web-flow
Merge 8371761b0 into 67ab3788e
Pull Request #9069: refactor!: `ChatMessage` serialization-deserialization updates

9833 of 10923 relevant lines covered (90.02%)

0.9 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

62.33
haystack/components/connectors/openapi_service.py
1
# SPDX-FileCopyrightText: 2022-present deepset GmbH <info@deepset.ai>
2
#
3
# SPDX-License-Identifier: Apache-2.0
4

5
import json
1✔
6
from collections import defaultdict
1✔
7
from copy import copy
1✔
8
from typing import Any, Dict, List, Optional, Union
1✔
9

10
from haystack import component, default_from_dict, default_to_dict
1✔
11
from haystack.dataclasses import ChatMessage, ChatRole
1✔
12
from haystack.lazy_imports import LazyImport
1✔
13

14
with LazyImport("Run 'pip install openapi3'") as openapi_imports:
1✔
15
    import requests
1✔
16
    from openapi3 import OpenAPI
1✔
17
    from openapi3.errors import UnexpectedResponseError
1✔
18
    from openapi3.paths import Operation
1✔
19

20
    # Patch the request method to add support for the proper raw_response handling
21
    # If you see that https://github.com/Dorthu/openapi3/pull/124/
22
    # is merged, we can remove this patch - notify authors of this code
23
    def patch_request(
1✔
24
        self,
25
        base_url: str,
26
        *,
27
        data: Optional[Any] = None,
28
        parameters: Optional[Dict[str, Any]] = None,
29
        raw_response: bool = False,
30
        security: Optional[Dict[str, str]] = None,
31
        session: Optional[Any] = None,
32
        verify: Union[bool, str] = True,
33
    ) -> Optional[Any]:
34
        """
35
        Sends an HTTP request as described by this path.
36

37
        :param base_url: The URL to append this operation's path to when making
38
                         the call.
39
        :param data: The request body to send.
40
        :param parameters: The parameters used to create the path.
41
        :param raw_response: If true, return the raw response instead of validating
42
                             and exterpolating it.
43
        :param security: The security scheme to use, and the values it needs to
44
                         process successfully.
45
        :param session: A persistent request session.
46
        :param verify: If we should do an ssl verification on the request or not.
47
                       In case str was provided, will use that as the CA.
48
        :return: The response data, either raw or processed depending on raw_response flag.
49
        """
50
        # Set request method (e.g. 'GET')
51
        self._request = requests.Request(self.path[-1])
×
52

53
        # Set self._request.url to base_url w/ path
54
        self._request.url = base_url + self.path[-2]
×
55

56
        parameters = parameters or {}
×
57
        security = security or {}
×
58

59
        if security and self.security:
×
60
            security_requirement = None
×
61
            for scheme, value in security.items():
×
62
                security_requirement = None
×
63
                for r in self.security:
×
64
                    if r.name == scheme:
×
65
                        security_requirement = r
×
66
                        self._request_handle_secschemes(r, value)
×
67

68
            if security_requirement is None:
×
69
                err_msg = """No security requirement satisfied (accepts {}) \
×
70
                          """.format(", ".join(self.security.keys()))
71
                raise ValueError(err_msg)
×
72

73
        if self.requestBody:
×
74
            if self.requestBody.required and data is None:
×
75
                err_msg = "Request Body is required but none was provided."
×
76
                raise ValueError(err_msg)
×
77

78
            self._request_handle_body(data)
×
79

80
        self._request_handle_parameters(parameters)
×
81

82
        if session is None:
×
83
            session = self._session
×
84

85
        # send the prepared request
86
        result = session.send(self._request.prepare(), verify=verify)
×
87

88
        # spec enforces these are strings
89
        status_code = str(result.status_code)
×
90

91
        # find the response model in spec we received
92
        expected_response = None
×
93
        if status_code in self.responses:
×
94
            expected_response = self.responses[status_code]
×
95
        elif "default" in self.responses:
×
96
            expected_response = self.responses["default"]
×
97

98
        if expected_response is None:
×
99
            raise UnexpectedResponseError(result, self)
×
100

101
        # if we got back a valid response code (or there was a default) and no
102
        # response content was expected, return None
103
        if expected_response.content is None:
×
104
            return None
×
105

106
        content_type = result.headers["Content-Type"]
×
107
        if ";" in content_type:
×
108
            # if the content type that came in included an encoding, we'll ignore
109
            # it for now (requests has already parsed it for us) and only look at
110
            # the MIME type when determining if an expected content type was returned.
111
            content_type = content_type.split(";")[0].strip()
×
112

113
        expected_media = expected_response.content.get(content_type, None)
×
114

115
        # If raw_response is True, return the raw text or json based on content type
116
        if raw_response:
×
117
            if "application/json" in content_type:
×
118
                return result.json()
×
119
            return result.text
×
120

121
        if expected_media is None and "/" in content_type:
×
122
            # accept media type ranges in the spec. the most specific matching
123
            # type should always be chosen, but if we do not have a match here
124
            # a generic range should be accepted if one if provided
125
            # https://github.com/OAI/OpenAPI-Specification/blob/master/versions/3.0.1.md#response-object
126

127
            generic_type = content_type.split("/")[0] + "/*"
×
128
            expected_media = expected_response.content.get(generic_type, None)
×
129

130
        if expected_media is None:
×
131
            err_msg = """Unexpected Content-Type {} returned for operation {} \
×
132
                         (expected one of {})"""
133
            err_var = result.headers["Content-Type"], self.operationId, ",".join(expected_response.content.keys())
×
134

135
            raise RuntimeError(err_msg.format(*err_var))
×
136

137
        if content_type.lower() == "application/json":
×
138
            return expected_media.schema.model(result.json())
×
139

140
        raise NotImplementedError("Only application/json content type is supported")
×
141

142
    # Apply the patch
143
    Operation.request = patch_request
1✔
144

145

146
@component
1✔
147
class OpenAPIServiceConnector:
1✔
148
    """
149
    A component which connects the Haystack framework to OpenAPI services.
150

151
    The `OpenAPIServiceConnector` component connects the Haystack framework to OpenAPI services, enabling it to call
152
    operations as defined in the OpenAPI specification of the service.
153

154
    It integrates with `ChatMessage` dataclass, where the payload in messages is used to determine the method to be
155
    called and the parameters to be passed. The message payload should be an OpenAI JSON formatted function calling
156
    string consisting of the method name and the parameters to be passed to the method. The method name and parameters
157
    are then used to invoke the method on the OpenAPI service. The response from the service is returned as a
158
    `ChatMessage`.
159

160
    Before using this component, users usually resolve service endpoint parameters with a help of
161
    `OpenAPIServiceToFunctions` component.
162

163
    The example below demonstrates how to use the `OpenAPIServiceConnector` to invoke a method on a https://serper.dev/
164
    service specified via OpenAPI specification.
165

166
    Note, however, that `OpenAPIServiceConnector` is usually not meant to be used directly, but rather as part of a
167
    pipeline that includes the `OpenAPIServiceToFunctions` component and an `OpenAIChatGenerator` component using LLM
168
    with the function calling capabilities. In the example below we use the function calling payload directly, but in a
169
    real-world scenario, the function calling payload would usually be generated by the `OpenAIChatGenerator` component.
170

171
    Usage example:
172

173
    ```python
174
    import json
175
    import requests
176

177
    from haystack.components.connectors import OpenAPIServiceConnector
178
    from haystack.dataclasses import ChatMessage
179

180

181
    fc_payload = [{'function': {'arguments': '{"q": "Why was Sam Altman ousted from OpenAI?"}', 'name': 'search'},
182
                   'id': 'call_PmEBYvZ7mGrQP5PUASA5m9wO', 'type': 'function'}]
183

184
    serper_token = <your_serper_dev_token>
185
    serperdev_openapi_spec = json.loads(requests.get("https://bit.ly/serper_dev_spec").text)
186
    service_connector = OpenAPIServiceConnector()
187
    result = service_connector.run(messages=[ChatMessage.from_assistant(json.dumps(fc_payload))],
188
                                   service_openapi_spec=serperdev_openapi_spec, service_credentials=serper_token)
189
    print(result)
190

191
    >> {'service_response': [ChatMessage(content='{"searchParameters": {"q": "Why was Sam Altman ousted from OpenAI?",
192
    >> "type": "search", "engine": "google"}, "answerBox": {"snippet": "Concerns over AI safety and OpenAI\'s role
193
    >> in protecting were at the center of Altman\'s brief ouster from the company."...
194
    ```
195

196
    """
197

198
    def __init__(self, ssl_verify: Optional[Union[bool, str]] = None):
1✔
199
        """
200
        Initializes the OpenAPIServiceConnector instance
201

202
        :param ssl_verify: Decide if to use SSL verification to the requests or not,
203
        in case a string is passed, will be used as the CA.
204
        """
205
        openapi_imports.check()
1✔
206
        self.ssl_verify = ssl_verify
1✔
207

208
    @component.output_types(service_response=Dict[str, Any])
1✔
209
    def run(
1✔
210
        self,
211
        messages: List[ChatMessage],
212
        service_openapi_spec: Dict[str, Any],
213
        service_credentials: Optional[Union[dict, str]] = None,
214
    ) -> Dict[str, List[ChatMessage]]:
215
        """
216
        Processes a list of chat messages to invoke a method on an OpenAPI service.
217

218
        It parses the last message in the list, expecting it to contain tool calls.
219

220
        :param messages: A list of `ChatMessage` objects containing the messages to be processed. The last message
221
        should contain the tool calls.
222
        :param service_openapi_spec: The OpenAPI JSON specification object of the service to be invoked. All the refs
223
        should already be resolved.
224
        :param service_credentials: The credentials to be used for authentication with the service.
225
        Currently, only the http and apiKey OpenAPI security schemes are supported.
226

227
        :return: A dictionary with the following keys:
228
            - `service_response`:  a list of `ChatMessage` objects, each containing the response from the service. The
229
                                   response is in JSON format, and the `content` attribute of the `ChatMessage` contains
230
                                   the JSON string.
231

232
        :raises ValueError: If the last message is not from the assistant or if it does not contain tool calls.
233
        """
234

235
        last_message = messages[-1]
1✔
236
        if not last_message.is_from(ChatRole.ASSISTANT):
1✔
237
            raise ValueError(f"{last_message} is not from the assistant.")
1✔
238

239
        tool_calls = last_message.tool_calls
1✔
240
        if not tool_calls:
1✔
241
            raise ValueError(f"The provided ChatMessage has no tool calls.\nChatMessage: {last_message}")
1✔
242

243
        function_payloads = []
1✔
244
        for tool_call in tool_calls:
1✔
245
            function_payloads.append({"arguments": tool_call.arguments, "name": tool_call.tool_name})
1✔
246

247
        # instantiate the OpenAPI service for the given specification
248
        openapi_service = OpenAPI(service_openapi_spec, ssl_verify=self.ssl_verify)
1✔
249
        self._authenticate_service(openapi_service, service_credentials)
1✔
250

251
        response_messages = []
1✔
252
        for method_invocation_descriptor in function_payloads:
1✔
253
            service_response = self._invoke_method(openapi_service, method_invocation_descriptor)
1✔
254
            # openapi3 parses the JSON service response into a model object, which is not our focus at the moment.
255
            # Instead, we require direct access to the raw JSON data of the response, rather than the model objects
256
            # provided by the openapi3 library. This approach helps us avoid issues related to (de)serialization.
257
            # By accessing the raw JSON response through `service_response._raw_data`, we can serialize this data
258
            # into a string. Finally, we use this string to create a ChatMessage object.
259
            response_messages.append(ChatMessage.from_user(json.dumps(service_response)))
1✔
260

261
        return {"service_response": response_messages}
1✔
262

263
    def to_dict(self) -> Dict[str, Any]:
1✔
264
        """
265
        Serializes the component to a dictionary.
266

267
        :returns:
268
            Dictionary with serialized data.
269
        """
270
        return default_to_dict(self, ssl_verify=self.ssl_verify)
1✔
271

272
    @classmethod
1✔
273
    def from_dict(cls, data: Dict[str, Any]) -> "OpenAPIServiceConnector":
1✔
274
        """
275
        Deserializes the component from a dictionary.
276

277
        :param data:
278
            The dictionary to deserialize from.
279
        :returns:
280
            The deserialized component.
281
        """
282
        return default_from_dict(cls, data)
1✔
283

284
    def _authenticate_service(self, openapi_service: "OpenAPI", credentials: Optional[Union[dict, str]] = None):
1✔
285
        """
286
        Authentication with an OpenAPI service.
287

288
        Authenticates with the OpenAPI service if required, supporting both single (str) and multiple
289
        authentication methods (dict).
290

291
        OpenAPI spec v3 supports the following security schemes:
292
        http – for Basic, Bearer and other HTTP authentications schemes
293
        apiKey – for API keys and cookie authentication
294
        oauth2 – for OAuth 2
295
        openIdConnect – for OpenID Connect Discovery
296

297
        Currently, only the http and apiKey schemes are supported. Multiple security schemes can be defined in the
298
        OpenAPI spec, and the credentials should be provided as a dictionary with keys matching the security scheme
299
        names. If only one security scheme is defined, the credentials can be provided as a simple string.
300

301
        :param openapi_service: The OpenAPI service instance.
302
        :param credentials: Credentials for authentication, which can be either a string (e.g. token) or a dictionary
303
        with keys matching the authentication method names.
304
        :raises ValueError: If authentication fails, is not found, or if appropriate credentials are missing.
305
        """
306
        if openapi_service.raw_element.get("components", {}).get("securitySchemes"):
1✔
307
            service_name = openapi_service.info.title
1✔
308
            if not credentials:
1✔
309
                raise ValueError(f"Service {service_name} requires authentication but no credentials were provided.")
1✔
310

311
            # a dictionary of security schemes defined in the OpenAPI spec
312
            # each key is the name of the security scheme, and the value is the scheme definition
313
            security_schemes = openapi_service.components.securitySchemes.raw_element
1✔
314
            supported_schemes = ["http", "apiKey"]  # todo: add support for oauth2 and openIdConnect
1✔
315

316
            authenticated = False
1✔
317
            for scheme_name, scheme in security_schemes.items():
1✔
318
                if scheme["type"] in supported_schemes:
1✔
319
                    auth_credentials = None
1✔
320
                    if isinstance(credentials, str):
1✔
321
                        auth_credentials = credentials
1✔
322
                    elif isinstance(credentials, dict) and scheme_name in credentials:
1✔
323
                        auth_credentials = credentials[scheme_name]
1✔
324
                    if auth_credentials:
1✔
325
                        openapi_service.authenticate(scheme_name, auth_credentials)
1✔
326
                        authenticated = True
1✔
327
                        break
1✔
328

329
                    raise ValueError(
×
330
                        f"Service {service_name} requires {scheme_name} security scheme but no "
331
                        f"credentials were provided for it. Check the service configuration and credentials."
332
                    )
333
            if not authenticated:
1✔
334
                raise ValueError(
1✔
335
                    f"Service {service_name} requires authentication but no credentials were provided "
336
                    f"for it. Check the service configuration and credentials."
337
                )
338

339
    def _invoke_method(self, openapi_service: "OpenAPI", method_invocation_descriptor: Dict[str, Any]) -> Any:
1✔
340
        """
341
        Invokes the specified method on the OpenAPI service.
342

343
        The method name and arguments are passed in the method_invocation_descriptor.
344

345
        :param openapi_service: The OpenAPI service instance.
346
        :param method_invocation_descriptor: The method name and arguments to be passed to the method. The payload
347
        should contain the method name (key: "name") and the arguments (key: "arguments"). The name is a string, and
348
        the arguments are a dictionary of key-value pairs.
349
        :return: A service JSON response.
350
        :raises RuntimeError: If the method is not found or invocation fails.
351
        """
352
        name = method_invocation_descriptor.get("name")
1✔
353
        invocation_arguments = copy(method_invocation_descriptor.get("arguments", {}))
1✔
354
        if not name or not invocation_arguments:
1✔
355
            raise ValueError(
×
356
                f"Invalid function calling descriptor: {method_invocation_descriptor} . It should contain "
357
                f"a method name and arguments."
358
            )
359

360
        # openapi3 specific method to call the operation, do we have it?
361
        method_to_call = getattr(openapi_service, f"call_{name}", None)
1✔
362
        if not callable(method_to_call):
1✔
363
            raise RuntimeError(f"Operation {name} not found in OpenAPI specification {openapi_service.info.title}")
×
364

365
        # get the operation reference from the method_to_call
366
        operation = method_to_call.operation.__self__
1✔
367
        operation_dict = operation.raw_element
1✔
368

369
        # Pack URL/query parameters under "parameters" key
370
        method_call_params: Dict[str, Dict[str, Any]] = defaultdict(dict)
1✔
371
        parameters = operation_dict.get("parameters", [])
1✔
372
        request_body = operation_dict.get("requestBody", {})
1✔
373

374
        for param in parameters:
1✔
375
            param_name = param["name"]
1✔
376
            param_value = invocation_arguments.get(param_name)
1✔
377
            if param_value:
1✔
378
                method_call_params["parameters"][param_name] = param_value
1✔
379
            else:
380
                if param.get("required", False):
1✔
381
                    raise ValueError(f"Missing parameter: '{param_name}' required for the '{name}' operation.")
1✔
382

383
        # Pack request body parameters under "data" key
384
        if request_body:
1✔
385
            schema = request_body.get("content", {}).get("application/json", {}).get("schema", {})
1✔
386
            required_params = schema.get("required", [])
1✔
387
            for param_name in schema.get("properties", {}):
1✔
388
                param_value = invocation_arguments.get(param_name)
1✔
389
                if param_value:
1✔
390
                    method_call_params["data"][param_name] = param_value
1✔
391
                else:
392
                    if param_name in required_params:
1✔
393
                        raise ValueError(
1✔
394
                            f"Missing requestBody parameter: '{param_name}' required for the '{name}' operation."
395
                        )
396
        # call the underlying service REST API with the parameters
397
        return method_to_call(**method_call_params, raw_response=True)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc