• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

deepset-ai / haystack / 15191527043

22 May 2025 04:08PM UTC coverage: 90.345% (-0.07%) from 90.411%
15191527043

Pull #9426

github

web-flow
Merge 212e60881 into 4a5e4d3e6
Pull Request #9426: feat: add component name and type to `StreamingChunk`

11173 of 12367 relevant lines covered (90.35%)

0.9 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.61
haystack/core/super_component/utils.py
1
# SPDX-FileCopyrightText: 2022-present deepset GmbH <info@deepset.ai>
2
#
3
# SPDX-License-Identifier: Apache-2.0
4

5
from typing import Annotated, Any, Dict, List, Optional, Set, Tuple, TypeVar, Union, cast, get_args, get_origin
1✔
6

7
from haystack.core.component.types import HAYSTACK_GREEDY_VARIADIC_ANNOTATION, HAYSTACK_VARIADIC_ANNOTATION
1✔
8

9

10
class _delegate_default:
1✔
11
    """Custom object for delegating filling of default values to the underlying components."""
12

13

14
T = TypeVar("T")
1✔
15

16

17
def _is_compatible(type1: T, type2: T, unwrap_nested: bool = True) -> Tuple[bool, Optional[T]]:
1✔
18
    """
19
    Check if two types are compatible (bidirectional/symmetric check).
20

21
    :param type1: First type to compare
22
    :param type2: Second type to compare
23
    :param unwrap_nested: If True, recursively unwraps nested Optional and Variadic types.
24
        If False, only unwraps at the top level.
25
    :return: Tuple of (True if types are compatible, common type if compatible)
26
    """
27
    type1_unwrapped = _unwrap_all(type1, recursive=unwrap_nested)
1✔
28
    type2_unwrapped = _unwrap_all(type2, recursive=unwrap_nested)
1✔
29

30
    return _types_are_compatible(type1_unwrapped, type2_unwrapped)
1✔
31

32

33
def _types_are_compatible(type1: T, type2: T) -> Tuple[bool, Optional[T]]:
1✔
34
    """
35
    Core type compatibility check implementing symmetric matching.
36

37
    :param type1: First unwrapped type to compare
38
    :param type2: Second unwrapped type to compare
39
    :return: True if types are compatible, False otherwise
40
    """
41
    # Handle Any type
42
    if type1 is Any:
1✔
43
        return True, _convert_to_typing_type(type2)
1✔
44
    if type2 is Any:
1✔
45
        return True, _convert_to_typing_type(type1)
1✔
46

47
    # Direct equality
48
    if type1 == type2:
1✔
49
        return True, _convert_to_typing_type(type1)
1✔
50

51
    type1_origin = get_origin(type1)
1✔
52
    type2_origin = get_origin(type2)
1✔
53

54
    # Handle Union types
55
    if type1_origin is Union or type2_origin is Union:
1✔
56
        return _check_union_compatibility(type1, type2, type1_origin, type2_origin)
1✔
57

58
    # Handle non-Union types
59
    return _check_non_union_compatibility(type1, type2, type1_origin, type2_origin)
1✔
60

61

62
def _check_union_compatibility(type1: T, type2: T, type1_origin: Any, type2_origin: Any) -> Tuple[bool, Optional[T]]:
1✔
63
    """Handle all Union type compatibility cases."""
64
    if type1_origin is Union and type2_origin is not Union:
1✔
65
        # Find all compatible types from the union
66
        compatible_types = []
1✔
67
        for union_arg in get_args(type1):
1✔
68
            is_compat, common = _types_are_compatible(union_arg, type2)
1✔
69
            if is_compat and common is not None:
1✔
70
                compatible_types.append(common)
1✔
71
        if compatible_types:
1✔
72
            # The constructed Union or single type must be cast to Optional[T]
73
            # to satisfy mypy, as T is specific to this function's call context.
74
            result_type = Union[tuple(compatible_types)] if len(compatible_types) > 1 else compatible_types[0]
1✔
75
            return True, cast(Optional[T], result_type)
1✔
76
        return False, None
×
77

78
    if type2_origin is Union and type1_origin is not Union:
1✔
79
        # Find all compatible types from the union
80
        compatible_types = []
1✔
81
        for union_arg in get_args(type2):
1✔
82
            is_compat, common = _types_are_compatible(type1, union_arg)
1✔
83
            if is_compat and common is not None:
1✔
84
                compatible_types.append(common)
1✔
85
        if compatible_types:
1✔
86
            # The constructed Union or single type must be cast to Optional[T]
87
            # to satisfy mypy, as T is specific to this function's call context.
88
            result_type = Union[tuple(compatible_types)] if len(compatible_types) > 1 else compatible_types[0]
1✔
89
            return True, cast(Optional[T], result_type)
1✔
90
        return False, None
1✔
91

92
    # Both are Union types
93
    compatible_types = []
1✔
94
    for arg1 in get_args(type1):
1✔
95
        for arg2 in get_args(type2):
1✔
96
            is_compat, common = _types_are_compatible(arg1, arg2)
1✔
97
            if is_compat and common is not None:
1✔
98
                compatible_types.append(common)
1✔
99

100
    if compatible_types:
1✔
101
        # The constructed Union or single type must be cast to Optional[T]
102
        # to satisfy mypy, as T is specific to this function's call context.
103
        result_type = Union[tuple(compatible_types)] if len(compatible_types) > 1 else compatible_types[0]
1✔
104
        return True, cast(Optional[T], result_type)
1✔
105
    return False, None
×
106

107

108
def _check_non_union_compatibility(
1✔
109
    type1: T, type2: T, type1_origin: Any, type2_origin: Any
110
) -> Tuple[bool, Optional[T]]:
111
    """Handle non-Union type compatibility cases."""
112
    # If no origin, compare types directly
113
    if not type1_origin and not type2_origin:
1✔
114
        if type1 == type2:
1✔
115
            return True, type1
×
116
        return False, None
1✔
117

118
    # Both must have origins and they must be equal
119
    if not (type1_origin and type2_origin and type1_origin == type2_origin):
1✔
120
        return False, None
1✔
121

122
    # Compare generic type arguments
123
    type1_args = get_args(type1)
1✔
124
    type2_args = get_args(type2)
1✔
125

126
    if len(type1_args) != len(type2_args):
1✔
127
        return False, None
×
128

129
    # Check if all arguments are compatible
130
    common_args = []
1✔
131
    for t1_arg, t2_arg in zip(type1_args, type2_args):
1✔
132
        is_compat, common = _types_are_compatible(t1_arg, t2_arg)
1✔
133
        if not is_compat:
1✔
134
            return False, None
1✔
135
        common_args.append(common)
×
136

137
    # Reconstruct the type with common arguments
138
    typing_type = _convert_to_typing_type(type1_origin)
×
139
    return True, cast(Optional[T], typing_type[tuple(common_args)])
×
140

141

142
def _unwrap_all(t: T, recursive: bool) -> T:
1✔
143
    """
144
    Unwrap a type until no more unwrapping is possible.
145

146
    :param t: Type to unwrap
147
    :param recursive: If True, recursively unwraps nested types
148
    :return: The fully unwrapped type
149
    """
150
    # First handle top-level Variadic/GreedyVariadic
151
    if _is_variadic_type(t):
1✔
152
        t = _unwrap_variadics(t, recursive=recursive)
1✔
153
    else:
154
        # If it's a generic type and we're unwrapping recursively
155
        origin = get_origin(t)
1✔
156
        if recursive and origin is not None and (args := get_args(t)):
1✔
157
            unwrapped_args = tuple(_unwrap_all(arg, recursive) for arg in args)
1✔
158
            t = origin[unwrapped_args]
1✔
159

160
    # Then handle top-level Optional
161
    if _is_optional_type(t):
1✔
162
        t = _unwrap_optionals(t, recursive=recursive)
1✔
163

164
    return t
1✔
165

166

167
def _is_variadic_type(t: T) -> bool:
1✔
168
    """Check if type is a Variadic or GreedyVariadic type."""
169
    origin = get_origin(t)
1✔
170
    if origin is Annotated:
1✔
171
        args = get_args(t)
1✔
172
        return len(args) >= 2 and args[1] in (HAYSTACK_VARIADIC_ANNOTATION, HAYSTACK_GREEDY_VARIADIC_ANNOTATION)  # noqa: PLR2004
1✔
173
    return False
1✔
174

175

176
def _is_optional_type(t: T) -> bool:
1✔
177
    """Check if type is an Optional type."""
178
    origin = get_origin(t)
1✔
179
    if origin is Union:
1✔
180
        args = get_args(t)
1✔
181
        return type(None) in args
1✔
182
    return False
1✔
183

184

185
def _unwrap_variadics(t: T, recursive: bool) -> T:
1✔
186
    """
187
    Unwrap Variadic or GreedyVariadic annotated types.
188

189
    :param t: Type to unwrap
190
    :param recursive: If True, recursively unwraps nested types
191
    :return: Unwrapped type if it was a variadic type, original type otherwise
192
    """
193
    if not _is_variadic_type(t):
1✔
194
        return t
×
195

196
    args = get_args(t)
1✔
197
    # Get the Iterable[X] type and extract X
198
    iterable_type = args[0]
1✔
199
    inner_type = get_args(iterable_type)[0]
1✔
200

201
    # Only recursively unwrap if requested
202
    if recursive:
1✔
203
        return _unwrap_all(inner_type, recursive)
1✔
204
    return inner_type
1✔
205

206

207
def _unwrap_optionals(t: T, recursive: bool) -> T:
1✔
208
    """
209
    Unwrap Optional[...] types (Union[X, None]).
210

211
    :param t: Type to unwrap
212
    :param recursive: If True, recursively unwraps nested types
213
    :return: Unwrapped type if it was an Optional, original type otherwise
214
    """
215
    if not _is_optional_type(t):
1✔
216
        return t
×
217

218
    args = list(get_args(t))
1✔
219
    args.remove(type(None))
1✔
220
    result = args[0] if len(args) == 1 else Union[tuple(args)]  # type: ignore
1✔
221

222
    # Only recursively unwrap if requested
223
    if recursive:
1✔
224
        return _unwrap_all(result, recursive)  # type: ignore
1✔
225
    return result  # type: ignore
×
226

227

228
def _convert_to_typing_type(t: Any) -> Any:
1✔
229
    """
230
    Convert built-in Python types to their typing equivalents.
231

232
    :param t: Type to convert
233
    :return: The type using typing module types
234
    """
235
    origin = get_origin(t)
1✔
236
    args = get_args(t)
1✔
237

238
    # Mapping of built-in types to their typing equivalents
239
    type_converters = {
1✔
240
        list: lambda: List if not args else List[Any],
241
        dict: lambda: Dict if not args else Dict[Any, Any],
242
        set: lambda: Set if not args else Set[Any],
243
        tuple: lambda: Tuple if not args else Tuple[Any, ...],
244
    }
245

246
    # Recursive argument handling
247
    if origin in type_converters:
1✔
248
        result = type_converters[origin]()
1✔
249
        if args:
1✔
250
            if origin == list:
1✔
251
                return List[_convert_to_typing_type(args[0])]  # type: ignore
1✔
252
            if origin == dict:
×
253
                return Dict[_convert_to_typing_type(args[0]), _convert_to_typing_type(args[1])]  # type: ignore
×
254
            if origin == set:
×
255
                return Set[_convert_to_typing_type(args[0])]  # type: ignore
×
256
            if origin == tuple:
×
257
                return Tuple[tuple(_convert_to_typing_type(arg) for arg in args)]
×
258
        return result
×
259
    return t
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc