• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

deepset-ai / haystack / 16754475749

05 Aug 2025 03:26PM UTC coverage: 91.946% (+0.05%) from 91.901%
16754475749

Pull #9678

github

web-flow
Merge 31abaf9ce into 323274e17
Pull Request #9678: Chore/pep585 type hints

12774 of 13893 relevant lines covered (91.95%)

0.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.74
haystack/core/super_component/utils.py
1
# SPDX-FileCopyrightText: 2022-present deepset GmbH <info@deepset.ai>
2
#
3
# SPDX-License-Identifier: Apache-2.0
4

5
from typing import Annotated, Any, Optional, TypeVar, Union, cast, get_args, get_origin
1✔
6

7
from haystack.core.component.types import HAYSTACK_GREEDY_VARIADIC_ANNOTATION, HAYSTACK_VARIADIC_ANNOTATION
1✔
8

9

10
class _delegate_default:
1✔
11
    """Custom object for delegating filling of default values to the underlying components."""
12

13

14
T = TypeVar("T")
1✔
15

16

17
def _is_compatible(type1: T, type2: T, unwrap_nested: bool = True) -> tuple[bool, Optional[T]]:
1✔
18
    """
19
    Check if two types are compatible (bidirectional/symmetric check).
20

21
    :param type1: First type to compare
22
    :param type2: Second type to compare
23
    :param unwrap_nested: If True, recursively unwraps nested Optional and Variadic types.
24
        If False, only unwraps at the top level.
25
    :return: Tuple of (True if types are compatible, common type if compatible)
26
    """
27
    type1_unwrapped = _unwrap_all(type1, recursive=unwrap_nested)
1✔
28
    type2_unwrapped = _unwrap_all(type2, recursive=unwrap_nested)
1✔
29

30
    return _types_are_compatible(type1_unwrapped, type2_unwrapped)
1✔
31

32

33
def _types_are_compatible(type1: T, type2: T) -> tuple[bool, Optional[T]]:
1✔
34
    """
35
    Core type compatibility check implementing symmetric matching.
36

37
    :param type1: First unwrapped type to compare
38
    :param type2: Second unwrapped type to compare
39
    :return: True if types are compatible, False otherwise
40
    """
41
    # Handle Any type
42
    if type1 is Any:
1✔
43
        return True, type2
1✔
44
    if type2 is Any:
1✔
45
        return True, type1
1✔
46

47
    # Direct equality
48
    if type1 == type2:
1✔
49
        return True, type1
1✔
50

51
    type1_origin = get_origin(type1)
1✔
52
    type2_origin = get_origin(type2)
1✔
53

54
    # Handle Union types
55
    if type1_origin is Union or type2_origin is Union:
1✔
56
        return _check_union_compatibility(type1, type2, type1_origin, type2_origin)
1✔
57

58
    # Handle non-Union types
59
    return _check_non_union_compatibility(type1, type2, type1_origin, type2_origin)
1✔
60

61

62
def _check_union_compatibility(type1: T, type2: T, type1_origin: Any, type2_origin: Any) -> tuple[bool, Optional[T]]:
1✔
63
    """Handle all Union type compatibility cases."""
64
    if type1_origin is Union and type2_origin is not Union:
1✔
65
        # Find all compatible types from the union
66
        compatible_types = []
1✔
67
        for union_arg in get_args(type1):
1✔
68
            is_compat, common = _types_are_compatible(union_arg, type2)
1✔
69
            if is_compat and common is not None:
1✔
70
                compatible_types.append(common)
1✔
71
        if compatible_types:
1✔
72
            # The constructed Union or single type must be cast to Optional[T]
73
            # to satisfy mypy, as T is specific to this function's call context.
74
            result_type = Union[tuple(compatible_types)] if len(compatible_types) > 1 else compatible_types[0]
1✔
75
            return True, cast(Optional[T], result_type)
1✔
76
        return False, None
×
77

78
    if type2_origin is Union and type1_origin is not Union:
1✔
79
        # Find all compatible types from the union
80
        compatible_types = []
1✔
81
        for union_arg in get_args(type2):
1✔
82
            is_compat, common = _types_are_compatible(type1, union_arg)
1✔
83
            if is_compat and common is not None:
1✔
84
                compatible_types.append(common)
1✔
85
        if compatible_types:
1✔
86
            # The constructed Union or single type must be cast to Optional[T]
87
            # to satisfy mypy, as T is specific to this function's call context.
88
            result_type = Union[tuple(compatible_types)] if len(compatible_types) > 1 else compatible_types[0]
1✔
89
            return True, cast(Optional[T], result_type)
1✔
90
        return False, None
1✔
91

92
    # Both are Union types
93
    compatible_types = []
1✔
94
    for arg1 in get_args(type1):
1✔
95
        for arg2 in get_args(type2):
1✔
96
            is_compat, common = _types_are_compatible(arg1, arg2)
1✔
97
            if is_compat and common is not None:
1✔
98
                compatible_types.append(common)
1✔
99

100
    if compatible_types:
1✔
101
        # The constructed Union or single type must be cast to Optional[T]
102
        # to satisfy mypy, as T is specific to this function's call context.
103
        result_type = Union[tuple(compatible_types)] if len(compatible_types) > 1 else compatible_types[0]
1✔
104
        return True, cast(Optional[T], result_type)
1✔
105
    return False, None
×
106

107

108
def _check_non_union_compatibility(
1✔
109
    type1: T, type2: T, type1_origin: Any, type2_origin: Any
110
) -> tuple[bool, Optional[T]]:
111
    """Handle non-Union type compatibility cases."""
112
    # If no origin, compare types directly
113
    if not type1_origin and not type2_origin:
1✔
114
        if type1 == type2:
1✔
115
            return True, type1
×
116
        return False, None
1✔
117

118
    # Both must have origins and they must be equal
119
    if not (type1_origin and type2_origin and type1_origin == type2_origin):
1✔
120
        return False, None
1✔
121

122
    # Compare generic type arguments
123
    type1_args = get_args(type1)
1✔
124
    type2_args = get_args(type2)
1✔
125

126
    if len(type1_args) != len(type2_args):
1✔
127
        return False, None
×
128

129
    # Check if all arguments are compatible
130
    common_args = []
1✔
131
    for t1_arg, t2_arg in zip(type1_args, type2_args):
1✔
132
        is_compat, common = _types_are_compatible(t1_arg, t2_arg)
1✔
133
        if not is_compat:
1✔
134
            return False, None
1✔
135
        common_args.append(common)
×
136

137
    # Reconstruct the type with common arguments
138
    return True, cast(Optional[T], type1_origin[tuple(common_args)])
×
139

140

141
def _unwrap_all(t: T, recursive: bool) -> T:
1✔
142
    """
143
    Unwrap a type until no more unwrapping is possible.
144

145
    :param t: Type to unwrap
146
    :param recursive: If True, recursively unwraps nested types
147
    :return: The fully unwrapped type
148
    """
149
    # First handle top-level Variadic/GreedyVariadic
150
    if _is_variadic_type(t):
1✔
151
        t = _unwrap_variadics(t, recursive=recursive)
1✔
152
    else:
153
        # If it's a generic type and we're unwrapping recursively
154
        origin = get_origin(t)
1✔
155
        if recursive and origin is not None and (args := get_args(t)):
1✔
156
            unwrapped_args = tuple(_unwrap_all(arg, recursive) for arg in args)
1✔
157
            t = origin[unwrapped_args]
1✔
158

159
    # Then handle top-level Optional
160
    if _is_optional_type(t):
1✔
161
        t = _unwrap_optionals(t, recursive=recursive)
1✔
162

163
    return t
1✔
164

165

166
def _is_variadic_type(t: T) -> bool:
1✔
167
    """Check if type is a Variadic or GreedyVariadic type."""
168
    origin = get_origin(t)
1✔
169
    if origin is Annotated:
1✔
170
        args = get_args(t)
1✔
171
        return len(args) >= 2 and args[1] in (HAYSTACK_VARIADIC_ANNOTATION, HAYSTACK_GREEDY_VARIADIC_ANNOTATION)  # noqa: PLR2004
1✔
172
    return False
1✔
173

174

175
def _is_optional_type(t: T) -> bool:
1✔
176
    """Check if type is an Optional type."""
177
    origin = get_origin(t)
1✔
178
    if origin is Union:
1✔
179
        args = get_args(t)
1✔
180
        return type(None) in args
1✔
181
    return False
1✔
182

183

184
def _unwrap_variadics(t: T, recursive: bool) -> T:
1✔
185
    """
186
    Unwrap Variadic or GreedyVariadic annotated types.
187

188
    :param t: Type to unwrap
189
    :param recursive: If True, recursively unwraps nested types
190
    :return: Unwrapped type if it was a variadic type, original type otherwise
191
    """
192
    if not _is_variadic_type(t):
1✔
193
        return t
×
194

195
    args = get_args(t)
1✔
196
    # Get the Iterable[X] type and extract X
197
    iterable_type = args[0]
1✔
198
    inner_type = get_args(iterable_type)[0]
1✔
199

200
    # Only recursively unwrap if requested
201
    if recursive:
1✔
202
        return _unwrap_all(inner_type, recursive)
1✔
203
    return inner_type
1✔
204

205

206
def _unwrap_optionals(t: T, recursive: bool) -> T:
1✔
207
    """
208
    Unwrap Optional[...] types (Union[X, None]).
209

210
    :param t: Type to unwrap
211
    :param recursive: If True, recursively unwraps nested types
212
    :return: Unwrapped type if it was an Optional, original type otherwise
213
    """
214
    if not _is_optional_type(t):
1✔
215
        return t
×
216

217
    args = list(get_args(t))
1✔
218
    args.remove(type(None))
1✔
219
    result = args[0] if len(args) == 1 else Union[tuple(args)]
1✔
220

221
    # Only recursively unwrap if requested
222
    if recursive:
1✔
223
        return _unwrap_all(result, recursive)  # type: ignore
1✔
224
    return result  # type: ignore
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc