• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

deepset-ai / haystack / 17761758071

16 Sep 2025 09:46AM UTC coverage: 91.995% (-0.05%) from 92.047%
17761758071

push

github

web-flow
feat: save last `AgentSnapshot` when Agent crashes (#9774)

* raise last good snapshot in PipelineRunTimeError + tests updates

* adding release notes

* renaming test file

* wip: PoC generating Agent snapshot + host pipeline snapshot and saving it to disk

* wip: agent tool error generates a valid snapshot file

* dealing with function seralisation/deserialisation

* wip: fixing typing issues

* fixing typing issues

* wip

* fixing types

* fixing tests

* extending tests for sudden crash + breakpoints and resume

* adding release notes

* merginng pipeline tests into single file

* adding missing test file

* WIP: PR comments/improvments

* set test as integration test

* refactor: Updates to saving an AgentSnapshot if execution fails (#9781)

* Updates

* Fix circular import

* Fixes

* Fix license header

* fixing typing

* fixing typing

---------

Co-authored-by: David S. Batista <dsbatista@gmail.com>

* Update haystack/utils/base_serialization.py

Co-authored-by: Amna Mubashar <amnahkhan.ak@gmail.com>

* wip

* fixing tests

* updaing deserialisation

* updating more tests, reorganising breakpoints tests

* fixing bug in saving agent_snapshot

* updating tests

* updating tests

* updating tests

* adding tests for serialisation/deserialisation of functions

---------

Co-authored-by: Sebastian Husch Lee <10526848+sjrl@users.noreply.github.com>
Co-authored-by: Amna Mubashar <amnahkhan.ak@gmail.com>

13032 of 14166 relevant lines covered (91.99%)

0.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.51
haystack/utils/base_serialization.py
1
# SPDX-FileCopyrightText: 2022-present deepset GmbH <info@deepset.ai>
2
#
3
# SPDX-License-Identifier: Apache-2.0
4

5
from typing import Any
1✔
6

7
from haystack.core.errors import DeserializationError, SerializationError
1✔
8
from haystack.core.serialization import generate_qualified_class_name, import_class_by_name
1✔
9
from haystack.utils import deserialize_callable, serialize_callable
1✔
10

11

12
def serialize_class_instance(obj: Any) -> dict[str, Any]:
1✔
13
    """
14
    Serializes an object that has a `to_dict` method into a dictionary.
15

16
    :param obj:
17
        The object to be serialized.
18
    :returns:
19
        A dictionary representation of the object.
20
    :raises SerializationError:
21
        If the object does not have a `to_dict` method.
22
    """
23
    if not hasattr(obj, "to_dict"):
1✔
24
        raise SerializationError(f"Object of class '{type(obj).__name__}' does not have a 'to_dict' method")
1✔
25

26
    output = obj.to_dict()
1✔
27
    return {"type": generate_qualified_class_name(type(obj)), "data": output}
1✔
28

29

30
def deserialize_class_instance(data: dict[str, Any]) -> Any:
1✔
31
    """
32
    Deserializes an object from a dictionary representation generated by `auto_serialize_class_instance`.
33

34
    :param data:
35
        The dictionary to deserialize from.
36
    :returns:
37
        The deserialized object.
38
    :raises DeserializationError:
39
        If the serialization data is malformed, the class type cannot be imported, or the
40
        class does not have a `from_dict` method.
41
    """
42
    if "type" not in data:
1✔
43
        raise DeserializationError("Missing 'type' in serialization data")
1✔
44
    if "data" not in data:
1✔
45
        raise DeserializationError("Missing 'data' in serialization data")
1✔
46

47
    try:
1✔
48
        obj_class = import_class_by_name(data["type"])
1✔
49
    except ImportError as e:
1✔
50
        raise DeserializationError(f"Class '{data['type']}' not correctly imported") from e
1✔
51

52
    if not hasattr(obj_class, "from_dict"):
1✔
53
        raise DeserializationError(f"Class '{data['type']}' does not have a 'from_dict' method")
1✔
54

55
    return obj_class.from_dict(data["data"])
1✔
56

57

58
def _serialize_value_with_schema(payload: Any) -> dict[str, Any]:
1✔
59
    """
60
    Serializes a value into a schema-aware format suitable for storage or transmission.
61

62
    The output format separates the schema information from the actual data, making it easier
63
    to deserialize complex nested structures correctly.
64

65
    The function handles:
66
    - Objects with to_dict() methods (e.g. dataclasses)
67
    - Objects with __dict__ attributes
68
    - Dictionaries
69
    - Lists, tuples, and sets. Lists with mixed types are not supported.
70
    - Primitive types (str, int, float, bool, None)
71

72
    :param payload: The value to serialize (can be any type)
73
    :returns: The serialized dict representation of the given value. Contains two keys:
74
        - "serialization_schema": Contains type information for each field.
75
        - "serialized_data": Contains the actual data in a simplified format.
76

77
    """
78
    # Handle dictionary case - iterate through fields
79
    if isinstance(payload, dict):
1✔
80
        schema: dict[str, Any] = {}
1✔
81
        data: dict[str, Any] = {}
1✔
82

83
        for field, val in payload.items():
1✔
84
            # Recursively serialize each field
85
            serialized_value = _serialize_value_with_schema(val)
1✔
86
            schema[field] = serialized_value["serialization_schema"]
1✔
87
            data[field] = serialized_value["serialized_data"]
1✔
88

89
        return {"serialization_schema": {"type": "object", "properties": schema}, "serialized_data": data}
1✔
90

91
    # Handle array case - iterate through elements
92
    elif isinstance(payload, (list, tuple, set)):
1✔
93
        # Convert to list for consistent handling
94
        pure_list = _convert_to_basic_types(list(payload))
1✔
95

96
        # Determine item type from first element (if any)
97
        if payload:
1✔
98
            first = next(iter(payload))
1✔
99
            item_schema = _serialize_value_with_schema(first)
1✔
100
            base_schema = {"type": "array", "items": item_schema["serialization_schema"]}
1✔
101
        else:
102
            base_schema = {"type": "array", "items": {}}
1✔
103

104
        # Add JSON Schema properties to infer sets and tuples
105
        if isinstance(payload, set):
1✔
106
            base_schema["uniqueItems"] = True
1✔
107
        elif isinstance(payload, tuple):
1✔
108
            base_schema["minItems"] = len(payload)
1✔
109
            base_schema["maxItems"] = len(payload)
1✔
110

111
        return {"serialization_schema": base_schema, "serialized_data": pure_list}
1✔
112

113
    # Handle Haystack style objects (e.g. dataclasses and Components)
114
    elif hasattr(payload, "to_dict") and callable(payload.to_dict):
1✔
115
        type_name = generate_qualified_class_name(type(payload))
1✔
116
        pure = _convert_to_basic_types(payload)
1✔
117
        schema = {"type": type_name}
1✔
118
        return {"serialization_schema": schema, "serialized_data": pure}
1✔
119

120
    # Handle callable functions serialization
121
    elif callable(payload) and not isinstance(payload, type):
1✔
122
        serialized = serialize_callable(payload)
1✔
123
        return {"serialization_schema": {"type": "typing.Callable"}, "serialized_data": serialized}
1✔
124

125
    # Handle arbitrary objects with __dict__
126
    elif hasattr(payload, "__dict__"):
1✔
127
        type_name = generate_qualified_class_name(type(payload))
×
128
        pure = _convert_to_basic_types(vars(payload))
×
129
        schema = {"type": type_name}
×
130
        return {"serialization_schema": schema, "serialized_data": pure}
×
131

132
    # Handle primitives
133
    else:
134
        prim_type = _primitive_schema_type(payload)
1✔
135
        schema = {"type": prim_type}
1✔
136
        return {"serialization_schema": schema, "serialized_data": payload}
1✔
137

138

139
def _primitive_schema_type(value: Any) -> str:
1✔
140
    """
141
    Helper function to determine the schema type for primitive values.
142
    """
143
    if value is None:
1✔
144
        return "null"
1✔
145
    if isinstance(value, bool):
1✔
146
        return "boolean"
1✔
147
    if isinstance(value, int):
1✔
148
        return "integer"
1✔
149
    if isinstance(value, float):
1✔
150
        return "number"
1✔
151
    if isinstance(value, str):
1✔
152
        return "string"
1✔
153
    return "string"  # fallback
×
154

155

156
def _convert_to_basic_types(value: Any) -> Any:
1✔
157
    """
158
    Helper function to recursively convert complex Python objects into their basic type equivalents.
159

160
    This helper function traverses through nested data structures and converts all complex
161
    objects (custom classes, dataclasses, etc.) into basic Python types (dict, list, str,
162
    int, float, bool, None) that can be easily serialized.
163

164
    The function handles:
165
    - Objects with to_dict() methods: converted using their to_dict implementation
166
    - Objects with __dict__ attribute: converted to plain dictionaries
167
    - Dictionaries: recursively converted values while preserving keys
168
    - Sequences (list, tuple, set): recursively converted while preserving type
169
    - Function objects: converted to None (functions cannot be serialized)
170
    - Primitive types: returned as-is
171

172
    """
173
    # dataclass‐style objects
174
    if hasattr(value, "to_dict") and callable(value.to_dict):
1✔
175
        return _convert_to_basic_types(value.to_dict())
1✔
176

177
    # Handle function objects - they cannot be serialized, so we return None
178
    if callable(value) and not isinstance(value, type):
1✔
179
        return None
×
180

181
    # arbitrary objects with __dict__
182
    if hasattr(value, "__dict__"):
1✔
183
        return {k: _convert_to_basic_types(v) for k, v in vars(value).items()}
×
184

185
    # dicts
186
    if isinstance(value, dict):
1✔
187
        return {k: _convert_to_basic_types(v) for k, v in value.items()}
1✔
188

189
    # sequences
190
    if isinstance(value, (list, tuple, set)):
1✔
191
        return [_convert_to_basic_types(v) for v in value]
1✔
192

193
    # primitive
194
    return value
1✔
195

196

197
def _deserialize_value_with_schema(serialized: dict[str, Any]) -> Any:  # pylint: disable=too-many-return-statements, # noqa: PLR0911, PLR0912
1✔
198
    """
199
    Deserializes a value with schema information back to its original form.
200

201
    Takes a dict of the form:
202
      {
203
         "serialization_schema": {"type": "integer"} or {"type": "object", "properties": {...}},
204
         "serialized_data": <the actual data>
205
      }
206

207
    :param serialized: The serialized dict with schema and data.
208
    :returns: The deserialized value in its original form.
209
    """
210

211
    if not serialized or "serialization_schema" not in serialized or "serialized_data" not in serialized:
1✔
212
        raise DeserializationError(
×
213
            f"Invalid format of passed serialized payload. Expected a dictionary with keys "
214
            f"'serialization_schema' and 'serialized_data'. Got: {serialized}"
215
        )
216
    schema = serialized["serialization_schema"]
1✔
217
    data = serialized["serialized_data"]
1✔
218

219
    schema_type = schema.get("type")
1✔
220

221
    if not schema_type:
1✔
222
        # for backward compatibility till Haystack 2.16 we use legacy implementation
223
        raise DeserializationError(
×
224
            "Missing 'type' key in 'serialization_schema'. This likely indicates that you're using a serialized "
225
            "State object created with a version of Haystack older than 2.15.0. "
226
            "Support for the old serialization format is removed in Haystack 2.16.0. "
227
            "Please upgrade to the new serialization format to ensure forward compatibility."
228
        )
229

230
    # Handle object case (dictionary with properties)
231
    if schema_type == "object":
1✔
232
        properties = schema.get("properties")
1✔
233
        if properties:
1✔
234
            result: dict[str, Any] = {}
1✔
235

236
            if isinstance(data, dict):
1✔
237
                for field, raw_value in data.items():
1✔
238
                    field_schema = properties.get(field)
1✔
239
                    if field_schema:
1✔
240
                        # Recursively deserialize each field - avoid creating temporary dict
241
                        result[field] = _deserialize_value_with_schema(
1✔
242
                            {"serialization_schema": field_schema, "serialized_data": raw_value}
243
                        )
244

245
            return result
1✔
246
        else:
247
            return _deserialize_value(data)
1✔
248

249
    # Handle array case
250
    elif schema_type == "array":
1✔
251
        # Cache frequently accessed schema properties
252
        item_schema = schema.get("items", {})
1✔
253
        item_type = item_schema.get("type", "any")
1✔
254
        is_set = schema.get("uniqueItems") is True
1✔
255
        is_tuple = schema.get("minItems") is not None and schema.get("maxItems") is not None
1✔
256

257
        # Handle nested objects/arrays first (most complex case)
258
        if item_type in ("object", "array"):
1✔
259
            return [
1✔
260
                _deserialize_value_with_schema({"serialization_schema": item_schema, "serialized_data": item})
261
                for item in data
262
            ]
263

264
        # Helper function to deserialize individual items
265
        def deserialize_item(item):
1✔
266
            if item_type == "any":
1✔
267
                return _deserialize_value(item)
×
268
            else:
269
                return _deserialize_value({"type": item_type, "data": item})
1✔
270

271
        # Handle different collection types
272
        if is_set:
1✔
273
            return {deserialize_item(item) for item in data}
1✔
274
        elif is_tuple:
1✔
275
            return tuple(deserialize_item(item) for item in data)
1✔
276
        else:
277
            return [deserialize_item(item) for item in data]
1✔
278

279
    # Handle primitive types
280
    elif schema_type in ("null", "boolean", "integer", "number", "string"):
1✔
281
        return data
1✔
282

283
    # Handle callable functions
284
    elif schema_type == "typing.Callable":
1✔
285
        return deserialize_callable(data)
1✔
286

287
    # Handle custom class types
288
    else:
289
        return _deserialize_value({"type": schema_type, "data": data})
1✔
290

291

292
def _deserialize_value(value: Any) -> Any:  # pylint: disable=too-many-return-statements # noqa: PLR0911
1✔
293
    """
294
    Helper function to deserialize values from their envelope format {"type": T, "data": D}.
295

296
    Handles four cases:
297
    - Typed envelopes: {"type": T, "data": D} where T determines deserialization method
298
    - Plain dicts: recursively deserialize values
299
    - Collections (list/tuple/set): recursively deserialize elements
300
    - Other values: return as-is
301

302
    :param value: The value to deserialize
303
    :returns: The deserialized value
304

305
    """
306
    # 1) Envelope case
307
    if isinstance(value, dict) and "type" in value and "data" in value:
1✔
308
        t = value["type"]
1✔
309
        payload = value["data"]
1✔
310

311
        # 1.a) Array
312
        if t == "array":
1✔
313
            return [_deserialize_value(child) for child in payload]
×
314

315
        # 1.b) Generic object/dict
316
        if t == "object":
1✔
317
            return {k: _deserialize_value(v) for k, v in payload.items()}
×
318

319
        # 1.c) Primitive
320
        if t in ("null", "boolean", "integer", "number", "string"):
1✔
321
            return payload
1✔
322

323
        # 1.d) Callable
324
        if t == "typing.Callable":
1✔
325
            return deserialize_callable(payload)
×
326

327
        # 1.e) Custom class
328
        cls = import_class_by_name(t)
1✔
329
        # first, recursively deserialize the inner payload
330
        deserialized_payload = {k: _deserialize_value(v) for k, v in payload.items()}
1✔
331
        # try from_dict
332
        if hasattr(cls, "from_dict") and callable(cls.from_dict):
1✔
333
            return cls.from_dict(deserialized_payload)
1✔
334
        # fallback: set attributes on a blank instance
335
        instance = cls.__new__(cls)
×
336
        for attr_name, attr_value in deserialized_payload.items():
×
337
            setattr(instance, attr_name, attr_value)
×
338
        return instance
×
339

340
    # 2) Plain dict (no envelope) → recurse
341
    if isinstance(value, dict):
1✔
342
        return {k: _deserialize_value(v) for k, v in value.items()}
1✔
343

344
    # 3) Collections → recurse
345
    if isinstance(value, (list, tuple, set)):
1✔
346
        return type(value)(_deserialize_value(v) for v in value)
1✔
347

348
    # 4) Fallback (shouldn't usually happen with our schema)
349
    return value
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc