• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

deepset-ai / haystack / 16782975494

06 Aug 2025 04:40PM UTC coverage: 91.927% (-0.004%) from 91.931%
16782975494

Pull #9678

github

web-flow
Merge 65a54bd0a into 4b9fb20ba
Pull Request #9678: Chore/pep585 type hints

12787 of 13910 relevant lines covered (91.93%)

0.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.73
haystack/components/validators/json_schema.py
1
# SPDX-FileCopyrightText: 2022-present deepset GmbH <info@deepset.ai>
2
#
3
# SPDX-License-Identifier: Apache-2.0
4

5
import json
1✔
6
from typing import Any, Optional
1✔
7

8
from jsonschema import ValidationError, validate
1✔
9

10
from haystack import component
1✔
11
from haystack.dataclasses import ChatMessage
1✔
12

13

14
def is_valid_json(s: str) -> bool:
1✔
15
    """
16
    Check if the provided string is a valid JSON.
17

18
    :param s: The string to be checked.
19
    :returns: `True` if the string is a valid JSON; otherwise, `False`.
20
    """
21
    try:
1✔
22
        json.loads(s)
1✔
23
    except ValueError:
×
24
        return False
×
25
    return True
1✔
26

27

28
@component
1✔
29
class JsonSchemaValidator:
1✔
30
    """
31
    Validates JSON content of `ChatMessage` against a specified [JSON Schema](https://json-schema.org/).
32

33
    If JSON content of a message conforms to the provided schema, the message is passed along the "validated" output.
34
    If the JSON content does not conform to the schema, the message is passed along the "validation_error" output.
35
    In the latter case, the error message is constructed using the provided `error_template` or a default template.
36
    These error ChatMessages can be used by LLMs in Haystack 2.x recovery loops.
37

38
    Usage example:
39

40
    ```python
41
    from haystack import Pipeline
42
    from haystack.components.generators.chat import OpenAIChatGenerator
43
    from haystack.components.joiners import BranchJoiner
44
    from haystack.components.validators import JsonSchemaValidator
45
    from haystack import component
46
    from haystack.dataclasses import ChatMessage
47

48

49
    @component
50
    class MessageProducer:
51

52
        @component.output_types(messages=list[ChatMessage])
53
        def run(self, messages: list[ChatMessage]) -> dict:
54
            return {"messages": messages}
55

56

57
    p = Pipeline()
58
    p.add_component("llm", OpenAIChatGenerator(model="gpt-4-1106-preview",
59
                                               generation_kwargs={"response_format": {"type": "json_object"}}))
60
    p.add_component("schema_validator", JsonSchemaValidator())
61
    p.add_component("joiner_for_llm", BranchJoiner(list[ChatMessage]))
62
    p.add_component("message_producer", MessageProducer())
63

64
    p.connect("message_producer.messages", "joiner_for_llm")
65
    p.connect("joiner_for_llm", "llm")
66
    p.connect("llm.replies", "schema_validator.messages")
67
    p.connect("schema_validator.validation_error", "joiner_for_llm")
68

69
    result = p.run(data={
70
        "message_producer": {
71
            "messages":[ChatMessage.from_user("Generate JSON for person with name 'John' and age 30")]},
72
            "schema_validator": {
73
                "json_schema": {
74
                    "type": "object",
75
                    "properties": {"name": {"type": "string"},
76
                    "age": {"type": "integer"}
77
                }
78
            }
79
        }
80
    })
81
    print(result)
82
    >> {'schema_validator': {'validated': [ChatMessage(_role=<ChatRole.ASSISTANT: 'assistant'>,
83
    _content=[TextContent(text="\\n{\\n  "name": "John",\\n  "age": 30\\n}")],
84
    _name=None, _meta={'model': 'gpt-4-1106-preview', 'index': 0,
85
    'finish_reason': 'stop', 'usage': {'completion_tokens': 17, 'prompt_tokens': 20, 'total_tokens': 37}})]}}
86
    ```
87
    """
88

89
    # Default error description template
90
    default_error_template = (
1✔
91
        "The following generated JSON does not conform to the provided schema.\n"
92
        "Generated JSON: {failing_json}\n"
93
        "Error details:\n- Message: {error_message}\n"
94
        "- Error Path in JSON: {error_path}\n"
95
        "- Schema Path: {error_schema_path}\n"
96
        "Please match the following schema:\n"
97
        "{json_schema}\n"
98
        "and provide the corrected JSON content ONLY. Please do not output anything else than the raw corrected "
99
        "JSON string, this is the most important part of the task. Don't use any markdown and don't add any comment."
100
    )
101

102
    def __init__(self, json_schema: Optional[dict[str, Any]] = None, error_template: Optional[str] = None):
1✔
103
        """
104
        Initialize the JsonSchemaValidator component.
105

106
        :param json_schema: A dictionary representing the [JSON schema](https://json-schema.org/) against which
107
            the messages' content is validated.
108
        :param error_template: A custom template string for formatting the error message in case of validation failure.
109
        """
110
        self.json_schema = json_schema
1✔
111
        self.error_template = error_template
1✔
112

113
    @component.output_types(validated=list[ChatMessage], validation_error=list[ChatMessage])
1✔
114
    def run(
1✔
115
        self,
116
        messages: list[ChatMessage],
117
        json_schema: Optional[dict[str, Any]] = None,
118
        error_template: Optional[str] = None,
119
    ) -> dict[str, list[ChatMessage]]:
120
        """
121
        Validates the last of the provided messages against the specified json schema.
122

123
        If it does, the message is passed along the "validated" output. If it does not, the message is passed along
124
        the "validation_error" output.
125

126
        :param messages: A list of ChatMessage instances to be validated. The last message in this list is the one
127
            that is validated.
128
        :param json_schema: A dictionary representing the [JSON schema](https://json-schema.org/)
129
            against which the messages' content is validated. If not provided, the schema from the component init
130
            is used.
131
        :param error_template: A custom template string for formatting the error message in case of validation. If not
132
            provided, the `error_template` from the component init is used.
133
        :return:  A dictionary with the following keys:
134
            - "validated": A list of messages if the last message is valid.
135
            - "validation_error": A list of messages if the last message is invalid.
136
        :raises ValueError: If no JSON schema is provided or if the message content is not a dictionary or a list of
137
            dictionaries.
138
        """
139
        last_message = messages[-1]
1✔
140
        if last_message.text is None:
1✔
141
            raise ValueError(f"The provided ChatMessage has no text. ChatMessage: {last_message}")
×
142
        if not is_valid_json(last_message.text):
1✔
143
            return {
×
144
                "validation_error": [
145
                    ChatMessage.from_user(
146
                        f"The message '{last_message.text}' is not a valid JSON object. "
147
                        f"Please provide only a valid JSON object in string format."
148
                        f"Don't use any markdown and don't add any comment."
149
                    )
150
                ]
151
            }
152

153
        last_message_content = json.loads(last_message.text)
1✔
154
        json_schema = json_schema or self.json_schema
1✔
155
        error_template = error_template or self.error_template or self.default_error_template
1✔
156

157
        if not json_schema:
1✔
158
            raise ValueError("Provide a JSON schema for validation either in the run method or in the component init.")
×
159
        # fc payload is json object but subtree `parameters` is string - we need to convert to json object
160
        # we need complete json to validate it against schema
161
        last_message_json = self._recursive_json_to_object(last_message_content)
1✔
162
        using_openai_schema: bool = self._is_openai_function_calling_schema(json_schema)
1✔
163
        if using_openai_schema:
1✔
164
            validation_schema = json_schema["parameters"]
1✔
165
        else:
166
            validation_schema = json_schema
1✔
167
        try:
1✔
168
            last_message_json = [last_message_json] if not isinstance(last_message_json, list) else last_message_json
1✔
169
            for content in last_message_json:
1✔
170
                if using_openai_schema:
1✔
171
                    validate(instance=content["function"]["arguments"], schema=validation_schema)
1✔
172
                else:
173
                    validate(instance=content, schema=validation_schema)
1✔
174

175
            return {"validated": [last_message]}
1✔
176
        except ValidationError as e:
1✔
177
            error_path = " -> ".join(map(str, e.absolute_path)) if e.absolute_path else "N/A"
1✔
178
            error_schema_path = " -> ".join(map(str, e.absolute_schema_path)) if e.absolute_schema_path else "N/A"
1✔
179

180
            error_template = error_template or self.default_error_template
1✔
181

182
            recovery_prompt = self._construct_error_recovery_message(
1✔
183
                error_template, str(e), error_path, error_schema_path, validation_schema, failing_json=last_message.text
184
            )
185
            return {"validation_error": [ChatMessage.from_user(recovery_prompt)]}
1✔
186

187
    def _construct_error_recovery_message(  # pylint: disable=too-many-positional-arguments
1✔
188
        self,
189
        error_template: str,
190
        error_message: str,
191
        error_path: str,
192
        error_schema_path: str,
193
        json_schema: dict[str, Any],
194
        failing_json: str,
195
    ) -> str:
196
        """
197
        Constructs an error recovery message using a specified template or the default one if none is provided.
198

199
        :param error_template: A custom template string for formatting the error message in case of validation failure.
200
        :param error_message: The error message returned by the JSON schema validator.
201
        :param error_path: The path in the JSON content where the error occurred.
202
        :param error_schema_path: The path in the JSON schema where the error occurred.
203
        :param json_schema: The JSON schema against which the content is validated.
204
        :param failing_json: The generated invalid JSON string.
205
        """
206
        error_template = error_template or self.default_error_template
1✔
207

208
        return error_template.format(
1✔
209
            error_message=error_message,
210
            error_path=error_path,
211
            error_schema_path=error_schema_path,
212
            json_schema=json_schema,
213
            failing_json=failing_json,
214
        )
215

216
    def _is_openai_function_calling_schema(self, json_schema: dict[str, Any]) -> bool:
1✔
217
        """
218
        Checks if the provided schema is a valid OpenAI function calling schema.
219

220
        :param json_schema: The JSON schema to check
221
        :return: `True` if the schema is a valid OpenAI function calling schema; otherwise, `False`.
222
        """
223
        return all(key in json_schema for key in ["name", "description", "parameters"])
1✔
224

225
    def _recursive_json_to_object(self, data: Any) -> Any:
1✔
226
        """
227
        Convert any string values that are valid JSON objects into dictionary objects.
228

229
        Returns a new data structure.
230

231
        :param data: The data structure to be traversed.
232
        :return: A new data structure with JSON strings converted to dictionary objects.
233
        """
234
        if isinstance(data, list):
1✔
235
            return [self._recursive_json_to_object(item) for item in data]
1✔
236

237
        if isinstance(data, dict):
1✔
238
            new_dict = {}
1✔
239
            for key, value in data.items():
1✔
240
                if isinstance(value, str):
1✔
241
                    try:
1✔
242
                        json_value = json.loads(value)
1✔
243
                        if isinstance(json_value, (dict, list)):
1✔
244
                            new_dict[key] = self._recursive_json_to_object(json_value)
1✔
245
                        else:
246
                            new_dict[key] = value  # Preserve the original string value
×
247
                    except json.JSONDecodeError:
1✔
248
                        new_dict[key] = value
1✔
249
                elif isinstance(value, dict):
1✔
250
                    new_dict[key] = self._recursive_json_to_object(value)
1✔
251
                else:
252
                    new_dict[key] = value
×
253
            return new_dict
1✔
254

255
        # If it's neither a list nor a dictionary, return the value directly
256
        raise ValueError("Input must be a dictionary or a list of dictionaries.")
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc