• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

deepset-ai / haystack / 13259218501

11 Feb 2025 09:00AM UTC coverage: 91.459% (-1.3%) from 92.709%
13259218501

Pull #8829

github

web-flow
Merge 427e76339 into ad90e106a
Pull Request #8829: fix: Look through all streaming chunks for tools calls

9413 of 10292 relevant lines covered (91.46%)

0.91 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

62.59
haystack/components/connectors/openapi_service.py
1
# SPDX-FileCopyrightText: 2022-present deepset GmbH <info@deepset.ai>
2
#
3
# SPDX-License-Identifier: Apache-2.0
4

5
import json
1✔
6
from collections import defaultdict
1✔
7
from copy import copy
1✔
8
from typing import Any, Dict, List, Optional, Union
1✔
9

10
from haystack import component, default_from_dict, default_to_dict, logging
1✔
11
from haystack.dataclasses import ChatMessage, ChatRole
1✔
12
from haystack.lazy_imports import LazyImport
1✔
13

14
logger = logging.getLogger(__name__)
1✔
15

16
with LazyImport("Run 'pip install openapi3'") as openapi_imports:
1✔
17
    import requests
1✔
18
    from openapi3 import OpenAPI
1✔
19
    from openapi3.errors import UnexpectedResponseError
1✔
20
    from openapi3.paths import Operation
1✔
21

22
    # Patch the request method to add support for the proper raw_response handling
23
    # If you see that https://github.com/Dorthu/openapi3/pull/124/
24
    # is merged, we can remove this patch - notify authors of this code
25
    def patch_request(
1✔
26
        self,
27
        base_url: str,
28
        *,
29
        data: Optional[Any] = None,
30
        parameters: Optional[Dict[str, Any]] = None,
31
        raw_response: bool = False,
32
        security: Optional[Dict[str, str]] = None,
33
        session: Optional[Any] = None,
34
        verify: Union[bool, str] = True,
35
    ) -> Optional[Any]:
36
        """
37
        Sends an HTTP request as described by this path.
38

39
        :param base_url: The URL to append this operation's path to when making
40
                         the call.
41
        :param data: The request body to send.
42
        :param parameters: The parameters used to create the path.
43
        :param raw_response: If true, return the raw response instead of validating
44
                             and exterpolating it.
45
        :param security: The security scheme to use, and the values it needs to
46
                         process successfully.
47
        :param session: A persistent request session.
48
        :param verify: If we should do an ssl verification on the request or not.
49
                       In case str was provided, will use that as the CA.
50
        :return: The response data, either raw or processed depending on raw_response flag.
51
        """
52
        # Set request method (e.g. 'GET')
53
        self._request = requests.Request(self.path[-1])
×
54

55
        # Set self._request.url to base_url w/ path
56
        self._request.url = base_url + self.path[-2]
×
57

58
        parameters = parameters or {}
×
59
        security = security or {}
×
60

61
        if security and self.security:
×
62
            security_requirement = None
×
63
            for scheme, value in security.items():
×
64
                security_requirement = None
×
65
                for r in self.security:
×
66
                    if r.name == scheme:
×
67
                        security_requirement = r
×
68
                        self._request_handle_secschemes(r, value)
×
69

70
            if security_requirement is None:
×
71
                err_msg = """No security requirement satisfied (accepts {}) \
×
72
                          """.format(", ".join(self.security.keys()))
73
                raise ValueError(err_msg)
×
74

75
        if self.requestBody:
×
76
            if self.requestBody.required and data is None:
×
77
                err_msg = "Request Body is required but none was provided."
×
78
                raise ValueError(err_msg)
×
79

80
            self._request_handle_body(data)
×
81

82
        self._request_handle_parameters(parameters)
×
83

84
        if session is None:
×
85
            session = self._session
×
86

87
        # send the prepared request
88
        result = session.send(self._request.prepare(), verify=verify)
×
89

90
        # spec enforces these are strings
91
        status_code = str(result.status_code)
×
92

93
        # find the response model in spec we received
94
        expected_response = None
×
95
        if status_code in self.responses:
×
96
            expected_response = self.responses[status_code]
×
97
        elif "default" in self.responses:
×
98
            expected_response = self.responses["default"]
×
99

100
        if expected_response is None:
×
101
            raise UnexpectedResponseError(result, self)
×
102

103
        # if we got back a valid response code (or there was a default) and no
104
        # response content was expected, return None
105
        if expected_response.content is None:
×
106
            return None
×
107

108
        content_type = result.headers["Content-Type"]
×
109
        if ";" in content_type:
×
110
            # if the content type that came in included an encoding, we'll ignore
111
            # it for now (requests has already parsed it for us) and only look at
112
            # the MIME type when determining if an expected content type was returned.
113
            content_type = content_type.split(";")[0].strip()
×
114

115
        expected_media = expected_response.content.get(content_type, None)
×
116

117
        # If raw_response is True, return the raw text or json based on content type
118
        if raw_response:
×
119
            if "application/json" in content_type:
×
120
                return result.json()
×
121
            return result.text
×
122

123
        if expected_media is None and "/" in content_type:
×
124
            # accept media type ranges in the spec. the most specific matching
125
            # type should always be chosen, but if we do not have a match here
126
            # a generic range should be accepted if one if provided
127
            # https://github.com/OAI/OpenAPI-Specification/blob/master/versions/3.0.1.md#response-object
128

129
            generic_type = content_type.split("/")[0] + "/*"
×
130
            expected_media = expected_response.content.get(generic_type, None)
×
131

132
        if expected_media is None:
×
133
            err_msg = """Unexpected Content-Type {} returned for operation {} \
×
134
                         (expected one of {})"""
135
            err_var = result.headers["Content-Type"], self.operationId, ",".join(expected_response.content.keys())
×
136

137
            raise RuntimeError(err_msg.format(*err_var))
×
138

139
        if content_type.lower() == "application/json":
×
140
            return expected_media.schema.model(result.json())
×
141

142
        raise NotImplementedError("Only application/json content type is supported")
×
143

144
    # Apply the patch
145
    Operation.request = patch_request
1✔
146

147

148
@component
1✔
149
class OpenAPIServiceConnector:
1✔
150
    """
151
    A component which connects the Haystack framework to OpenAPI services.
152

153
    The `OpenAPIServiceConnector` component connects the Haystack framework to OpenAPI services, enabling it to call
154
    operations as defined in the OpenAPI specification of the service.
155

156
    It integrates with `ChatMessage` dataclass, where the payload in messages is used to determine the method to be
157
    called and the parameters to be passed. The message payload should be an OpenAI JSON formatted function calling
158
    string consisting of the method name and the parameters to be passed to the method. The method name and parameters
159
    are then used to invoke the method on the OpenAPI service. The response from the service is returned as a
160
    `ChatMessage`.
161

162
    Before using this component, users usually resolve service endpoint parameters with a help of
163
    `OpenAPIServiceToFunctions` component.
164

165
    The example below demonstrates how to use the `OpenAPIServiceConnector` to invoke a method on a https://serper.dev/
166
    service specified via OpenAPI specification.
167

168
    Note, however, that `OpenAPIServiceConnector` is usually not meant to be used directly, but rather as part of a
169
    pipeline that includes the `OpenAPIServiceToFunctions` component and an `OpenAIChatGenerator` component using LLM
170
    with the function calling capabilities. In the example below we use the function calling payload directly, but in a
171
    real-world scenario, the function calling payload would usually be generated by the `OpenAIChatGenerator` component.
172

173
    Usage example:
174

175
    ```python
176
    import json
177
    import requests
178

179
    from haystack.components.connectors import OpenAPIServiceConnector
180
    from haystack.dataclasses import ChatMessage
181

182

183
    fc_payload = [{'function': {'arguments': '{"q": "Why was Sam Altman ousted from OpenAI?"}', 'name': 'search'},
184
                   'id': 'call_PmEBYvZ7mGrQP5PUASA5m9wO', 'type': 'function'}]
185

186
    serper_token = <your_serper_dev_token>
187
    serperdev_openapi_spec = json.loads(requests.get("https://bit.ly/serper_dev_spec").text)
188
    service_connector = OpenAPIServiceConnector()
189
    result = service_connector.run(messages=[ChatMessage.from_assistant(json.dumps(fc_payload))],
190
                                   service_openapi_spec=serperdev_openapi_spec, service_credentials=serper_token)
191
    print(result)
192

193
    >> {'service_response': [ChatMessage(content='{"searchParameters": {"q": "Why was Sam Altman ousted from OpenAI?",
194
    >> "type": "search", "engine": "google"}, "answerBox": {"snippet": "Concerns over AI safety and OpenAI\'s role
195
    >> in protecting were at the center of Altman\'s brief ouster from the company."...
196
    ```
197

198
    """
199

200
    def __init__(self, ssl_verify: Optional[Union[bool, str]] = None):
1✔
201
        """
202
        Initializes the OpenAPIServiceConnector instance
203

204
        :param ssl_verify: Decide if to use SSL verification to the requests or not,
205
        in case a string is passed, will be used as the CA.
206
        """
207
        openapi_imports.check()
1✔
208
        self.ssl_verify = ssl_verify
1✔
209

210
    @component.output_types(service_response=Dict[str, Any])
1✔
211
    def run(
1✔
212
        self,
213
        messages: List[ChatMessage],
214
        service_openapi_spec: Dict[str, Any],
215
        service_credentials: Optional[Union[dict, str]] = None,
216
    ) -> Dict[str, List[ChatMessage]]:
217
        """
218
        Processes a list of chat messages to invoke a method on an OpenAPI service.
219

220
        It parses the last message in the list, expecting it to contain tool calls.
221

222
        :param messages: A list of `ChatMessage` objects containing the messages to be processed. The last message
223
        should contain the tool calls.
224
        :param service_openapi_spec: The OpenAPI JSON specification object of the service to be invoked. All the refs
225
        should already be resolved.
226
        :param service_credentials: The credentials to be used for authentication with the service.
227
        Currently, only the http and apiKey OpenAPI security schemes are supported.
228

229
        :return: A dictionary with the following keys:
230
            - `service_response`:  a list of `ChatMessage` objects, each containing the response from the service. The
231
                                   response is in JSON format, and the `content` attribute of the `ChatMessage` contains
232
                                   the JSON string.
233

234
        :raises ValueError: If the last message is not from the assistant or if it does not contain tool calls.
235
        """
236

237
        last_message = messages[-1]
1✔
238
        if not last_message.is_from(ChatRole.ASSISTANT):
1✔
239
            raise ValueError(f"{last_message} is not from the assistant.")
1✔
240

241
        tool_calls = last_message.tool_calls
1✔
242
        if not tool_calls:
1✔
243
            raise ValueError(f"The provided ChatMessage has no tool calls.\nChatMessage: {last_message}")
1✔
244

245
        function_payloads = []
1✔
246
        for tool_call in tool_calls:
1✔
247
            function_payloads.append({"arguments": tool_call.arguments, "name": tool_call.tool_name})
1✔
248

249
        # instantiate the OpenAPI service for the given specification
250
        openapi_service = OpenAPI(service_openapi_spec, ssl_verify=self.ssl_verify)
1✔
251
        self._authenticate_service(openapi_service, service_credentials)
1✔
252

253
        response_messages = []
1✔
254
        for method_invocation_descriptor in function_payloads:
1✔
255
            service_response = self._invoke_method(openapi_service, method_invocation_descriptor)
1✔
256
            # openapi3 parses the JSON service response into a model object, which is not our focus at the moment.
257
            # Instead, we require direct access to the raw JSON data of the response, rather than the model objects
258
            # provided by the openapi3 library. This approach helps us avoid issues related to (de)serialization.
259
            # By accessing the raw JSON response through `service_response._raw_data`, we can serialize this data
260
            # into a string. Finally, we use this string to create a ChatMessage object.
261
            response_messages.append(ChatMessage.from_user(json.dumps(service_response)))
1✔
262

263
        return {"service_response": response_messages}
1✔
264

265
    def to_dict(self) -> Dict[str, Any]:
1✔
266
        """
267
        Serializes the component to a dictionary.
268

269
        :returns:
270
            Dictionary with serialized data.
271
        """
272
        return default_to_dict(self, ssl_verify=self.ssl_verify)
1✔
273

274
    @classmethod
1✔
275
    def from_dict(cls, data: Dict[str, Any]) -> "OpenAPIServiceConnector":
1✔
276
        """
277
        Deserializes the component from a dictionary.
278

279
        :param data:
280
            The dictionary to deserialize from.
281
        :returns:
282
            The deserialized component.
283
        """
284
        return default_from_dict(cls, data)
1✔
285

286
    def _authenticate_service(self, openapi_service: "OpenAPI", credentials: Optional[Union[dict, str]] = None):
1✔
287
        """
288
        Authentication with an OpenAPI service.
289

290
        Authenticates with the OpenAPI service if required, supporting both single (str) and multiple
291
        authentication methods (dict).
292

293
        OpenAPI spec v3 supports the following security schemes:
294
        http – for Basic, Bearer and other HTTP authentications schemes
295
        apiKey – for API keys and cookie authentication
296
        oauth2 – for OAuth 2
297
        openIdConnect – for OpenID Connect Discovery
298

299
        Currently, only the http and apiKey schemes are supported. Multiple security schemes can be defined in the
300
        OpenAPI spec, and the credentials should be provided as a dictionary with keys matching the security scheme
301
        names. If only one security scheme is defined, the credentials can be provided as a simple string.
302

303
        :param openapi_service: The OpenAPI service instance.
304
        :param credentials: Credentials for authentication, which can be either a string (e.g. token) or a dictionary
305
        with keys matching the authentication method names.
306
        :raises ValueError: If authentication fails, is not found, or if appropriate credentials are missing.
307
        """
308
        if openapi_service.raw_element.get("components", {}).get("securitySchemes"):
1✔
309
            service_name = openapi_service.info.title
1✔
310
            if not credentials:
1✔
311
                raise ValueError(f"Service {service_name} requires authentication but no credentials were provided.")
1✔
312

313
            # a dictionary of security schemes defined in the OpenAPI spec
314
            # each key is the name of the security scheme, and the value is the scheme definition
315
            security_schemes = openapi_service.components.securitySchemes.raw_element
1✔
316
            supported_schemes = ["http", "apiKey"]  # todo: add support for oauth2 and openIdConnect
1✔
317

318
            authenticated = False
1✔
319
            for scheme_name, scheme in security_schemes.items():
1✔
320
                if scheme["type"] in supported_schemes:
1✔
321
                    auth_credentials = None
1✔
322
                    if isinstance(credentials, str):
1✔
323
                        auth_credentials = credentials
1✔
324
                    elif isinstance(credentials, dict) and scheme_name in credentials:
1✔
325
                        auth_credentials = credentials[scheme_name]
1✔
326
                    if auth_credentials:
1✔
327
                        openapi_service.authenticate(scheme_name, auth_credentials)
1✔
328
                        authenticated = True
1✔
329
                        break
1✔
330

331
                    raise ValueError(
×
332
                        f"Service {service_name} requires {scheme_name} security scheme but no "
333
                        f"credentials were provided for it. Check the service configuration and credentials."
334
                    )
335
            if not authenticated:
1✔
336
                raise ValueError(
1✔
337
                    f"Service {service_name} requires authentication but no credentials were provided "
338
                    f"for it. Check the service configuration and credentials."
339
                )
340

341
    def _invoke_method(self, openapi_service: "OpenAPI", method_invocation_descriptor: Dict[str, Any]) -> Any:
1✔
342
        """
343
        Invokes the specified method on the OpenAPI service.
344

345
        The method name and arguments are passed in the method_invocation_descriptor.
346

347
        :param openapi_service: The OpenAPI service instance.
348
        :param method_invocation_descriptor: The method name and arguments to be passed to the method. The payload
349
        should contain the method name (key: "name") and the arguments (key: "arguments"). The name is a string, and
350
        the arguments are a dictionary of key-value pairs.
351
        :return: A service JSON response.
352
        :raises RuntimeError: If the method is not found or invocation fails.
353
        """
354
        name = method_invocation_descriptor.get("name")
1✔
355
        invocation_arguments = copy(method_invocation_descriptor.get("arguments", {}))
1✔
356
        if not name or not invocation_arguments:
1✔
357
            raise ValueError(
×
358
                f"Invalid function calling descriptor: {method_invocation_descriptor} . It should contain "
359
                f"a method name and arguments."
360
            )
361

362
        # openapi3 specific method to call the operation, do we have it?
363
        method_to_call = getattr(openapi_service, f"call_{name}", None)
1✔
364
        if not callable(method_to_call):
1✔
365
            raise RuntimeError(f"Operation {name} not found in OpenAPI specification {openapi_service.info.title}")
×
366

367
        # get the operation reference from the method_to_call
368
        operation = method_to_call.operation.__self__
1✔
369
        operation_dict = operation.raw_element
1✔
370

371
        # Pack URL/query parameters under "parameters" key
372
        method_call_params: Dict[str, Dict[str, Any]] = defaultdict(dict)
1✔
373
        parameters = operation_dict.get("parameters", [])
1✔
374
        request_body = operation_dict.get("requestBody", {})
1✔
375

376
        for param in parameters:
1✔
377
            param_name = param["name"]
1✔
378
            param_value = invocation_arguments.get(param_name)
1✔
379
            if param_value:
1✔
380
                method_call_params["parameters"][param_name] = param_value
1✔
381
            else:
382
                if param.get("required", False):
1✔
383
                    raise ValueError(f"Missing parameter: '{param_name}' required for the '{name}' operation.")
1✔
384

385
        # Pack request body parameters under "data" key
386
        if request_body:
1✔
387
            schema = request_body.get("content", {}).get("application/json", {}).get("schema", {})
1✔
388
            required_params = schema.get("required", [])
1✔
389
            for param_name in schema.get("properties", {}):
1✔
390
                param_value = invocation_arguments.get(param_name)
1✔
391
                if param_value:
1✔
392
                    method_call_params["data"][param_name] = param_value
1✔
393
                else:
394
                    if param_name in required_params:
1✔
395
                        raise ValueError(
1✔
396
                            f"Missing requestBody parameter: '{param_name}' required for the '{name}' operation."
397
                        )
398
        # call the underlying service REST API with the parameters
399
        return method_to_call(**method_call_params, raw_response=True)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc