• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

deepset-ai / haystack / 8096865523

29 Feb 2024 01:31PM UTC coverage: 89.905% (-0.2%) from 90.144%
8096865523

push

github

web-flow
chore: enforce kwarg logging (#7207)

* chore: add logger which eases logging of extras

* chore: start migrating to key value

* fix: import fixes

* tests: temporarily comment out breaking test

* refactor: move to kwarg based logging

* style: fix import order

* chore: implement self-review comments

* test: drop failing test

* chore: fix more import orders

* docs: add changelog

* tests: fix broken tests

* chore: fix getting the frames

* chore: add comment

* chore: cleanup

* chore: adapt remaining `%s` usages

5281 of 5874 relevant lines covered (89.9%)

0.9 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.85
haystack/components/converters/openapi_functions.py
1
import json
1✔
2
import os
1✔
3
from pathlib import Path
1✔
4
from typing import Any, Dict, List, Optional, Union
1✔
5

6
import requests
1✔
7
import yaml
1✔
8
from requests import RequestException
1✔
9

10
from haystack import Document, component, logging
1✔
11
from haystack.dataclasses.byte_stream import ByteStream
1✔
12
from haystack.lazy_imports import LazyImport
1✔
13

14
logger = logging.getLogger(__name__)
1✔
15

16
with LazyImport("Run 'pip install jsonref'") as openapi_imports:
1✔
17
    import jsonref
1✔
18

19

20
@component
1✔
21
class OpenAPIServiceToFunctions:
1✔
22
    """
23
    OpenAPIServiceToFunctions is responsible for converting an OpenAPI service specification into a format suitable
24
    for OpenAI function calling, based on the provided OpenAPI specification. Given an OpenAPI specification,
25
    OpenAPIServiceToFunctions processes it, and extracts function definitions that can be invoked via OpenAI's
26
    function calling mechanism. The format of the extracted functions is compatible with OpenAI's function calling
27
    JSON format.
28

29
    Minimal requirements for OpenAPI specification:
30
    - OpenAPI version 3.0.0 or higher
31
    - Each path must have:
32
        - a unique operationId
33
        - a description
34
        - a requestBody or parameters or both
35
        - a schema for the requestBody and/or parameters
36

37

38
    See https://github.com/OAI/OpenAPI-Specification for more details on OpenAPI specification.
39
    See https://platform.openai.com/docs/guides/function-calling for more details on OpenAI function calling.
40
    """
41

42
    MIN_REQUIRED_OPENAPI_SPEC_VERSION = 3
1✔
43

44
    def __init__(self):
1✔
45
        """
46
        Initializes the OpenAPIServiceToFunctions instance
47
        """
48
        openapi_imports.check()
1✔
49

50
    @component.output_types(documents=List[Document])
1✔
51
    def run(
1✔
52
        self, sources: List[Union[str, Path, ByteStream]], system_messages: Optional[List[str]] = None
53
    ) -> Dict[str, Any]:
54
        """
55
        Processes OpenAPI specification URLs or files to extract functions that can be invoked via OpenAI function
56
        calling mechanism. Each source is paired with an optional system message. The system message can be potentially
57
        used in LLM response generation.
58

59
        :param sources: A list of OpenAPI specification sources, which can be URLs, file paths, or ByteStream objects.
60
        :type sources: List[Union[str, Path, ByteStream]]
61
        :param system_messages: A list of optional system messages corresponding to each source.
62
        :type system_messages: Optional[List[str]]
63
        :return: A dictionary with a key 'documents' containing a list of Document objects. Each Document object
64
                 encapsulates a function definition and relevant metadata.
65
        :rtype: Dict[str, Any]
66
        :raises RuntimeError: If the OpenAPI specification cannot be downloaded or processed.
67
        :raises ValueError: If the source type is not recognized or no functions are found in the OpenAPI specification.
68
        """
69
        documents: List[Document] = []
1✔
70
        system_messages = system_messages or [""] * len(sources)
1✔
71
        for source, system_message in zip(sources, system_messages):
1✔
72
            openapi_spec_content = None
1✔
73
            if isinstance(source, (str, Path)):
1✔
74
                # check if the source is a file path or a URL
75
                if os.path.exists(source):
1✔
76
                    openapi_spec_content = self._read_from_file(source)
1✔
77
                else:
78
                    openapi_spec_content = self._read_from_url(str(source))
×
79
            elif isinstance(source, ByteStream):
1✔
80
                openapi_spec_content = source.data.decode("utf-8")
1✔
81
            else:
82
                logger.warning(
×
83
                    "Invalid source type {source}. Only str, Path, and ByteStream are supported.", source=type(source)
84
                )
85
                continue
×
86

87
            if openapi_spec_content:
1✔
88
                try:
1✔
89
                    service_openapi_spec = self._parse_openapi_spec(openapi_spec_content)
1✔
90
                    functions: List[Dict[str, Any]] = self._openapi_to_functions(service_openapi_spec)
1✔
91
                    for function in functions:
1✔
92
                        meta: Dict[str, Any] = {"spec": service_openapi_spec}
1✔
93
                        if system_message:
1✔
94
                            meta["system_message"] = system_message
1✔
95
                        doc = Document(content=json.dumps(function), meta=meta)
1✔
96
                        documents.append(doc)
1✔
97
                except Exception as e:
×
98
                    logger.error(
×
99
                        "Error processing OpenAPI specification from source {source}: {error}", source=source, error=e
100
                    )
101

102
        return {"documents": documents}
1✔
103

104
    def _openapi_to_functions(self, service_openapi_spec: Dict[str, Any]) -> List[Dict[str, Any]]:
1✔
105
        """
106
        Extracts functions from the OpenAPI specification of the service and converts them into a format
107
        suitable for OpenAI function calling.
108

109
        :param service_openapi_spec: The OpenAPI specification from which functions are to be extracted.
110
        :type service_openapi_spec: Dict[str, Any]
111
        :return: A list of dictionaries, each representing a function. Each dictionary includes the function's
112
                 name, description, and a schema of its parameters.
113
        :rtype: List[Dict[str, Any]]
114
        """
115

116
        # Doesn't enforce rigid spec validation because that would require a lot of dependencies
117
        # We check the version and require minimal fields to be present, so we can extract functions
118
        spec_version = service_openapi_spec.get("openapi")
1✔
119
        if not spec_version:
1✔
120
            raise ValueError(f"Invalid OpenAPI spec provided. Could not extract version from {service_openapi_spec}")
×
121
        service_openapi_spec_version = int(spec_version.split(".")[0])
1✔
122

123
        # Compare the versions
124
        if service_openapi_spec_version < OpenAPIServiceToFunctions.MIN_REQUIRED_OPENAPI_SPEC_VERSION:
1✔
125
            raise ValueError(
×
126
                f"Invalid OpenAPI spec version {service_openapi_spec_version}. Must be "
127
                f"at least {OpenAPIServiceToFunctions.MIN_REQUIRED_OPENAPI_SPEC_VERSION}."
128
            )
129

130
        functions: List[Dict[str, Any]] = []
1✔
131
        for paths in service_openapi_spec["paths"].values():
1✔
132
            for path_spec in paths.values():
1✔
133
                function_dict = self._parse_endpoint_spec(path_spec)
1✔
134
                if function_dict:
1✔
135
                    functions.append(function_dict)
1✔
136
        return functions
1✔
137

138
    def _parse_endpoint_spec(self, resolved_spec: Dict[str, Any]) -> Optional[Dict[str, Any]]:
1✔
139
        if not isinstance(resolved_spec, dict):
1✔
140
            logger.warning("Invalid OpenAPI spec format provided. Could not extract function.")
×
141
            return {}
×
142

143
        function_name = resolved_spec.get("operationId")
1✔
144
        description = resolved_spec.get("description") or resolved_spec.get("summary", "")
1✔
145

146
        schema: Dict[str, Any] = {"type": "object", "properties": {}}
1✔
147

148
        # requestBody section
149
        req_body_schema = (
1✔
150
            resolved_spec.get("requestBody", {}).get("content", {}).get("application/json", {}).get("schema", {})
151
        )
152
        if "properties" in req_body_schema:
1✔
153
            for prop_name, prop_schema in req_body_schema["properties"].items():
1✔
154
                schema["properties"][prop_name] = self._parse_property_attributes(prop_schema)
1✔
155

156
            if "required" in req_body_schema:
1✔
157
                schema.setdefault("required", []).extend(req_body_schema["required"])
1✔
158

159
        # parameters section
160
        for param in resolved_spec.get("parameters", []):
1✔
161
            if "schema" in param:
×
162
                schema_dict = self._parse_property_attributes(param["schema"])
×
163
                # these attributes are not in param[schema] level but on param level
164
                useful_attributes = ["description", "pattern", "enum"]
×
165
                schema_dict.update({key: param[key] for key in useful_attributes if param.get(key)})
×
166
                schema["properties"][param["name"]] = schema_dict
×
167
                if param.get("required", False):
×
168
                    schema.setdefault("required", []).append(param["name"])
×
169

170
        if function_name and description and schema["properties"]:
1✔
171
            return {"name": function_name, "description": description, "parameters": schema}
1✔
172
        else:
173
            logger.warning(
×
174
                "Invalid OpenAPI spec format provided. Could not extract function from {spec}", spec=resolved_spec
175
            )
176
            return {}
×
177

178
    def _parse_property_attributes(
1✔
179
        self, property_schema: Dict[str, Any], include_attributes: Optional[List[str]] = None
180
    ) -> Dict[str, Any]:
181
        """
182
        Recursively parses the attributes of a property schema, including nested objects and arrays,
183
        and includes specified attributes like description, pattern, etc.
184

185
        :param property_schema: The schema of the property to parse.
186
        :param include_attributes: The list of attributes to include in the parsed schema.
187
        :return: The parsed schema of the property including the specified attributes.
188
        """
189
        include_attributes = include_attributes or ["description", "pattern", "enum"]
1✔
190

191
        schema_type = property_schema.get("type")
1✔
192

193
        parsed_schema = {"type": schema_type} if schema_type else {}
1✔
194
        for attr in include_attributes:
1✔
195
            if attr in property_schema:
1✔
196
                parsed_schema[attr] = property_schema[attr]
1✔
197

198
        if schema_type == "object":
1✔
199
            properties = property_schema.get("properties", {})
1✔
200
            parsed_properties = {
1✔
201
                prop_name: self._parse_property_attributes(prop, include_attributes)
202
                for prop_name, prop in properties.items()
203
            }
204
            parsed_schema["properties"] = parsed_properties
1✔
205

206
            if "required" in property_schema:
1✔
207
                parsed_schema["required"] = property_schema["required"]
1✔
208

209
        elif schema_type == "array":
1✔
210
            items = property_schema.get("items", {})
×
211
            parsed_schema["items"] = self._parse_property_attributes(items, include_attributes)
×
212

213
        return parsed_schema
1✔
214

215
    def _parse_openapi_spec(self, content: str) -> Dict[str, Any]:
1✔
216
        """
217
        Parses OpenAPI specification content, supporting both JSON and YAML formats.
218

219
        :param content: The content of the OpenAPI specification.
220
        :return: The parsed OpenAPI specification.
221
        """
222
        open_api_spec_content = None
1✔
223
        try:
1✔
224
            open_api_spec_content = json.loads(content)
1✔
225
            return jsonref.replace_refs(open_api_spec_content)
1✔
226
        except json.JSONDecodeError as json_error:
1✔
227
            # heuristic to confirm that the content is likely malformed JSON
228
            if content.strip().startswith(("{", "[")):
1✔
229
                raise json_error
×
230

231
        try:
1✔
232
            open_api_spec_content = yaml.safe_load(content)
1✔
233
        except yaml.YAMLError:
×
234
            error_message = (
×
235
                "Failed to parse the OpenAPI specification. "
236
                "The content does not appear to be valid JSON or YAML.\n\n"
237
            )
238
            raise RuntimeError(error_message, content)
×
239

240
        # Replace references in the object with their resolved values, if any
241
        return jsonref.replace_refs(open_api_spec_content)
1✔
242

243
    def _read_from_file(self, path: Union[str, Path]) -> Optional[str]:
1✔
244
        """
245
        Reads the content of a file, given its path.
246
        :param path: The path of the file.
247
        :type path: Union[str, Path]
248
        :return: The content of the file or None if the file cannot be read.
249
        """
250
        try:
1✔
251
            with open(path, "r") as f:
1✔
252
                return f.read()
1✔
253
        except IOError as e:
×
254
            logger.warning("IO error reading file: {path}. Error: {error}", path=path, error=e)
×
255
            return None
×
256

257
    def _read_from_url(self, url: str) -> Optional[str]:
1✔
258
        """
259
        Reads the content of a URL.
260
        :param url: The URL to read.
261
        :type url: str
262
        :return: The content of the URL or None if the URL cannot be read.
263
        """
264
        try:
×
265
            response = requests.get(url, timeout=10)
×
266
            response.raise_for_status()
×
267
            return response.text
×
268
        except RequestException as e:
×
269
            logger.warning("Error fetching URL: {url}. Error: {error}", url=url, error=e)
×
270
            return None
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc