• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

deepset-ai / haystack / 14831172275

05 May 2025 07:20AM UTC coverage: 90.418% (-0.1%) from 90.513%
14831172275

Pull #9290

github

web-flow
Merge 65c6ba7e0 into 7db719981
Pull Request #9290: feat: enable streaming ToolCall/Result from Agent

10908 of 12064 relevant lines covered (90.42%)

0.9 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.81
haystack/core/super_component/super_component.py
1
# SPDX-FileCopyrightText: 2022-present deepset GmbH <info@deepset.ai>
2
#
3
# SPDX-License-Identifier: Apache-2.0
4

5
import functools
1✔
6
from pathlib import Path
1✔
7
from types import new_class
1✔
8
from typing import Any, Dict, List, Optional, Tuple, Union
1✔
9

10
from haystack import logging
1✔
11
from haystack.core.component.component import component
1✔
12
from haystack.core.pipeline.async_pipeline import AsyncPipeline
1✔
13
from haystack.core.pipeline.pipeline import Pipeline
1✔
14
from haystack.core.pipeline.utils import parse_connect_string
1✔
15
from haystack.core.serialization import default_from_dict, default_to_dict, generate_qualified_class_name
1✔
16
from haystack.core.super_component.utils import _delegate_default, _is_compatible
1✔
17

18
logger = logging.getLogger(__name__)
1✔
19

20

21
class InvalidMappingTypeError(Exception):
1✔
22
    """Raised when input or output mappings have invalid types or type conflicts."""
23

24
    pass
1✔
25

26

27
class InvalidMappingValueError(Exception):
1✔
28
    """Raised when input or output mappings have invalid values or missing components/sockets."""
29

30
    pass
1✔
31

32

33
@component
1✔
34
class _SuperComponent:
1✔
35
    def __init__(
1✔
36
        self,
37
        pipeline: Union[Pipeline, AsyncPipeline],
38
        input_mapping: Optional[Dict[str, List[str]]] = None,
39
        output_mapping: Optional[Dict[str, str]] = None,
40
    ) -> None:
41
        """
42
        Creates a SuperComponent with optional input and output mappings.
43

44
        :param pipeline: The pipeline instance or async pipeline instance to be wrapped
45
        :param input_mapping: A dictionary mapping component input names to pipeline input socket paths.
46
            If not provided, a default input mapping will be created based on all pipeline inputs.
47
        :param output_mapping: A dictionary mapping pipeline output socket paths to component output names.
48
            If not provided, a default output mapping will be created based on all pipeline outputs.
49
        :raises InvalidMappingError: Raised if any mapping is invalid or type conflicts occur
50
        :raises ValueError: Raised if no pipeline is provided
51
        """
52
        if pipeline is None:
1✔
53
            raise ValueError("Pipeline must be provided to SuperComponent.")
×
54

55
        self.pipeline: Union[Pipeline, AsyncPipeline] = pipeline
1✔
56
        self._warmed_up = False
1✔
57

58
        # Determine input types based on pipeline and mapping
59
        pipeline_inputs = self.pipeline.inputs()
1✔
60
        resolved_input_mapping = (
1✔
61
            input_mapping if input_mapping is not None else self._create_input_mapping(pipeline_inputs)
62
        )
63
        self._validate_input_mapping(pipeline_inputs, resolved_input_mapping)
1✔
64
        input_types = self._resolve_input_types_from_mapping(pipeline_inputs, resolved_input_mapping)
1✔
65
        # Set input types on the component
66
        for input_name, info in input_types.items():
1✔
67
            component.set_input_type(self, name=input_name, **info)
1✔
68

69
        self.input_mapping: Dict[str, List[str]] = resolved_input_mapping
1✔
70
        self._original_input_mapping = input_mapping
1✔
71

72
        # Set output types based on pipeline and mapping
73
        leaf_pipeline_outputs = self.pipeline.outputs()
1✔
74
        all_possible_pipeline_outputs = self.pipeline.outputs(include_components_with_connected_outputs=True)
1✔
75

76
        resolved_output_mapping = (
1✔
77
            output_mapping if output_mapping is not None else self._create_output_mapping(leaf_pipeline_outputs)
78
        )
79
        self._validate_output_mapping(all_possible_pipeline_outputs, resolved_output_mapping)
1✔
80
        output_types = self._resolve_output_types_from_mapping(all_possible_pipeline_outputs, resolved_output_mapping)
1✔
81
        # Set output types on the component
82
        component.set_output_types(self, **output_types)
1✔
83
        self.output_mapping: Dict[str, str] = resolved_output_mapping
1✔
84
        self._original_output_mapping = output_mapping
1✔
85

86
    def warm_up(self) -> None:
1✔
87
        """
88
        Warms up the SuperComponent by warming up the wrapped pipeline.
89
        """
90
        if not self._warmed_up:
1✔
91
            self.pipeline.warm_up()
1✔
92
            self._warmed_up = True
1✔
93

94
    def run(self, **kwargs: Any) -> Dict[str, Any]:
1✔
95
        """
96
        Runs the wrapped pipeline with the provided inputs.
97

98
        Steps:
99
        1. Maps the inputs from kwargs to pipeline component inputs
100
        2. Runs the pipeline
101
        3. Maps the pipeline outputs to the SuperComponent's outputs
102

103
        :param kwargs: Keyword arguments matching the SuperComponent's input names
104
        :returns:
105
            Dictionary containing the SuperComponent's output values
106
        """
107
        filtered_inputs = {param: value for param, value in kwargs.items() if value != _delegate_default}
1✔
108
        pipeline_inputs = self._map_explicit_inputs(input_mapping=self.input_mapping, inputs=filtered_inputs)
1✔
109
        include_outputs_from = self._get_include_outputs_from()
1✔
110
        pipeline_outputs = self.pipeline.run(data=pipeline_inputs, include_outputs_from=include_outputs_from)
1✔
111
        return self._map_explicit_outputs(pipeline_outputs, self.output_mapping)
1✔
112

113
    def _get_include_outputs_from(self) -> set[str]:
1✔
114
        # Collecting the component names from output_mapping
115
        return {self._split_component_path(path)[0] for path in self.output_mapping.keys()}
1✔
116

117
    async def run_async(self, **kwargs: Any) -> Dict[str, Any]:
1✔
118
        """
119
        Runs the wrapped pipeline with the provided inputs async.
120

121
        Steps:
122
        1. Maps the inputs from kwargs to pipeline component inputs
123
        2. Runs the pipeline async
124
        3. Maps the pipeline outputs to the SuperComponent's outputs
125

126
        :param kwargs: Keyword arguments matching the SuperComponent's input names
127
        :returns:
128
            Dictionary containing the SuperComponent's output values
129
        :raises TypeError:
130
            If the pipeline is not an AsyncPipeline
131
        """
132
        if not isinstance(self.pipeline, AsyncPipeline):
1✔
133
            raise TypeError("Pipeline is not an AsyncPipeline. run_async is not supported.")
×
134

135
        filtered_inputs = {param: value for param, value in kwargs.items() if value != _delegate_default}
1✔
136
        pipeline_inputs = self._map_explicit_inputs(input_mapping=self.input_mapping, inputs=filtered_inputs)
1✔
137
        pipeline_outputs = await self.pipeline.run_async(data=pipeline_inputs)
1✔
138
        return self._map_explicit_outputs(pipeline_outputs, self.output_mapping)
1✔
139

140
    @staticmethod
1✔
141
    def _split_component_path(path: str) -> Tuple[str, str]:
1✔
142
        """
143
        Splits a component path into a component name and a socket name.
144

145
        :param path: A string in the format "component_name.socket_name".
146
        :returns:
147
            A tuple containing (component_name, socket_name).
148
        :raises InvalidMappingValueError:
149
            If the path format is incorrect.
150
        """
151
        comp_name, socket_name = parse_connect_string(path)
1✔
152
        if socket_name is None:
1✔
153
            raise InvalidMappingValueError(f"Invalid path format: '{path}'. Expected 'component_name.socket_name'.")
1✔
154
        return comp_name, socket_name
1✔
155

156
    def _validate_input_mapping(
1✔
157
        self, pipeline_inputs: Dict[str, Dict[str, Any]], input_mapping: Dict[str, List[str]]
158
    ) -> None:
159
        """
160
        Validates the input mapping to ensure that specified components and sockets exist in the pipeline.
161

162
        :param pipeline_inputs: A dictionary containing pipeline input specifications.
163
        :param input_mapping: A dictionary mapping wrapper input names to pipeline socket paths.
164
        :raises InvalidMappingTypeError:
165
            If the input mapping is of invalid type or contains invalid types.
166
        :raises InvalidMappingValueError:
167
            If the input mapping contains nonexistent components or sockets.
168
        """
169
        if not isinstance(input_mapping, dict):
1✔
170
            raise InvalidMappingTypeError("input_mapping must be a dictionary")
×
171

172
        for wrapper_input_name, pipeline_input_paths in input_mapping.items():
1✔
173
            if not isinstance(pipeline_input_paths, list):
1✔
174
                raise InvalidMappingTypeError(f"Input paths for '{wrapper_input_name}' must be a list of strings.")
1✔
175
            for path in pipeline_input_paths:
1✔
176
                comp_name, socket_name = self._split_component_path(path)
1✔
177
                if comp_name not in pipeline_inputs:
1✔
178
                    raise InvalidMappingValueError(f"Component '{comp_name}' not found in pipeline inputs.")
1✔
179
                if socket_name not in pipeline_inputs[comp_name]:
1✔
180
                    raise InvalidMappingValueError(
×
181
                        f"Input socket '{socket_name}' not found in component '{comp_name}'."
182
                    )
183

184
    def _resolve_input_types_from_mapping(
1✔
185
        self, pipeline_inputs: Dict[str, Dict[str, Any]], input_mapping: Dict[str, List[str]]
186
    ) -> Dict[str, Dict[str, Any]]:
187
        """
188
        Resolves and validates input types based on the provided input mapping.
189

190
        This function ensures that all mapped pipeline inputs are compatible, consolidating types
191
        when multiple mappings exist. It also determines whether an input is mandatory or has a default value.
192

193
        :param pipeline_inputs: A dictionary containing pipeline input specifications.
194
        :param input_mapping: A dictionary mapping SuperComponent inputs to pipeline socket paths.
195
        :returns:
196
            A dictionary specifying the resolved input types and their properties.
197
        :raises InvalidMappingTypeError:
198
            If the input mapping contains incompatible types.
199
        """
200
        aggregated_inputs: Dict[str, Dict[str, Any]] = {}
1✔
201
        for wrapper_input_name, pipeline_input_paths in input_mapping.items():
1✔
202
            for path in pipeline_input_paths:
1✔
203
                comp_name, socket_name = self._split_component_path(path)
1✔
204
                socket_info = pipeline_inputs[comp_name][socket_name]
1✔
205

206
                # Add to aggregated inputs
207
                existing_socket_info = aggregated_inputs.get(wrapper_input_name)
1✔
208
                if existing_socket_info is None:
1✔
209
                    aggregated_inputs[wrapper_input_name] = {"type": socket_info["type"]}
1✔
210
                    if not socket_info["is_mandatory"]:
1✔
211
                        aggregated_inputs[wrapper_input_name]["default"] = _delegate_default
1✔
212
                    continue
1✔
213

214
                if not _is_compatible(existing_socket_info["type"], socket_info["type"]):
1✔
215
                    raise InvalidMappingTypeError(
×
216
                        f"Type conflict for input '{socket_name}' from component '{comp_name}'. "
217
                        f"Existing type: {existing_socket_info['type']}, new type: {socket_info['type']}."
218
                    )
219

220
                # If any socket requires mandatory inputs then the aggregated input is also considered mandatory.
221
                # So we use the type of the mandatory input and remove the default value if it exists.
222
                if socket_info["is_mandatory"]:
1✔
223
                    aggregated_inputs[wrapper_input_name]["type"] = socket_info["type"]
1✔
224
                    aggregated_inputs[wrapper_input_name].pop("default", None)
1✔
225

226
        return aggregated_inputs
1✔
227

228
    @staticmethod
1✔
229
    def _create_input_mapping(pipeline_inputs: Dict[str, Dict[str, Any]]) -> Dict[str, List[str]]:
1✔
230
        """
231
        Create an input mapping from pipeline inputs.
232

233
        :param pipeline_inputs: Dictionary of pipeline input specifications
234
        :returns:
235
            Dictionary mapping SuperComponent input names to pipeline socket paths
236
        """
237
        input_mapping: Dict[str, List[str]] = {}
1✔
238
        for comp_name, inputs_dict in pipeline_inputs.items():
1✔
239
            for socket_name in inputs_dict.keys():
1✔
240
                existing_socket_info = input_mapping.get(socket_name)
1✔
241
                if existing_socket_info is None:
1✔
242
                    input_mapping[socket_name] = [f"{comp_name}.{socket_name}"]
1✔
243
                    continue
1✔
244
                input_mapping[socket_name].append(f"{comp_name}.{socket_name}")
1✔
245
        return input_mapping
1✔
246

247
    def _validate_output_mapping(
1✔
248
        self, pipeline_outputs: Dict[str, Dict[str, Any]], output_mapping: Dict[str, str]
249
    ) -> None:
250
        """
251
        Validates the output mapping to ensure that specified components and sockets exist in the pipeline.
252

253
        :param pipeline_outputs: A dictionary containing pipeline output specifications.
254
        :param output_mapping: A dictionary mapping pipeline socket paths to wrapper output names.
255
        :raises InvalidMappingTypeError:
256
            If the output mapping is of invalid type or contains invalid types.
257
        :raises InvalidMappingValueError:
258
            If the output mapping contains nonexistent components or sockets.
259
        """
260
        for pipeline_output_path, wrapper_output_name in output_mapping.items():
1✔
261
            if not isinstance(wrapper_output_name, str):
1✔
262
                raise InvalidMappingTypeError("Output names in output_mapping must be strings.")
1✔
263
            comp_name, socket_name = self._split_component_path(pipeline_output_path)
1✔
264
            if comp_name not in pipeline_outputs:
1✔
265
                raise InvalidMappingValueError(f"Component '{comp_name}' not found among pipeline outputs.")
1✔
266
            if socket_name not in pipeline_outputs[comp_name]:
1✔
267
                raise InvalidMappingValueError(f"Output socket '{socket_name}' not found in component '{comp_name}'.")
×
268

269
    def _resolve_output_types_from_mapping(
1✔
270
        self, pipeline_outputs: Dict[str, Dict[str, Any]], output_mapping: Dict[str, str]
271
    ) -> Dict[str, Any]:
272
        """
273
        Resolves and validates output types based on the provided output mapping.
274

275
        This function ensures that all mapped pipeline outputs are correctly assigned to
276
        the corresponding SuperComponent outputs while preventing duplicate output names.
277

278
        :param pipeline_outputs: A dictionary containing pipeline output specifications.
279
        :param output_mapping: A dictionary mapping pipeline output socket paths to SuperComponent output names.
280
        :returns:
281
            A dictionary mapping SuperComponent output names to their resolved types.
282
        :raises InvalidMappingValueError:
283
            If the output mapping contains duplicate output names.
284
        """
285
        resolved_outputs = {}
1✔
286
        for pipeline_output_path, wrapper_output_name in output_mapping.items():
1✔
287
            comp_name, socket_name = self._split_component_path(pipeline_output_path)
1✔
288
            if wrapper_output_name in resolved_outputs:
1✔
289
                raise InvalidMappingValueError(f"Duplicate output name '{wrapper_output_name}' in output_mapping.")
1✔
290
            resolved_outputs[wrapper_output_name] = pipeline_outputs[comp_name][socket_name]["type"]
1✔
291
        return resolved_outputs
1✔
292

293
    @staticmethod
1✔
294
    def _create_output_mapping(pipeline_outputs: Dict[str, Dict[str, Any]]) -> Dict[str, str]:
1✔
295
        """
296
        Create an output mapping from pipeline outputs.
297

298
        :param pipeline_outputs: Dictionary of pipeline output specifications
299
        :returns:
300
            Dictionary mapping pipeline socket paths to SuperComponent output names
301
        :raises InvalidMappingValueError:
302
            If there are output name conflicts between components
303
        """
304
        output_mapping = {}
1✔
305
        used_output_names: set[str] = set()
1✔
306
        for comp_name, outputs_dict in pipeline_outputs.items():
1✔
307
            for socket_name in outputs_dict.keys():
1✔
308
                if socket_name in used_output_names:
1✔
309
                    raise InvalidMappingValueError(
×
310
                        f"Output name conflict: '{socket_name}' is produced by multiple components. "
311
                        "Please provide an output_mapping to resolve this conflict."
312
                    )
313
                used_output_names.add(socket_name)
1✔
314
                output_mapping[f"{comp_name}.{socket_name}"] = socket_name
1✔
315
        return output_mapping
1✔
316

317
    def _map_explicit_inputs(
1✔
318
        self, input_mapping: Dict[str, List[str]], inputs: Dict[str, Any]
319
    ) -> Dict[str, Dict[str, Any]]:
320
        """
321
        Map inputs according to explicit input mapping.
322

323
        :param input_mapping: Mapping configuration for inputs
324
        :param inputs: Input arguments provided to wrapper
325
        :return: Dictionary of mapped pipeline inputs
326
        """
327
        pipeline_inputs: Dict[str, Dict[str, Any]] = {}
1✔
328
        for wrapper_input_name, pipeline_input_paths in input_mapping.items():
1✔
329
            if wrapper_input_name not in inputs:
1✔
330
                continue
1✔
331

332
            for socket_path in pipeline_input_paths:
1✔
333
                comp_name, input_name = self._split_component_path(socket_path)
1✔
334
                if comp_name not in pipeline_inputs:
1✔
335
                    pipeline_inputs[comp_name] = {}
1✔
336
                pipeline_inputs[comp_name][input_name] = inputs[wrapper_input_name]
1✔
337

338
        return pipeline_inputs
1✔
339

340
    def _map_explicit_outputs(
1✔
341
        self, pipeline_outputs: Dict[str, Dict[str, Any]], output_mapping: Dict[str, str]
342
    ) -> Dict[str, Any]:
343
        """
344
        Map outputs according to explicit output mapping.
345

346
        :param pipeline_outputs: Raw outputs from pipeline execution
347
        :param output_mapping: Output mapping configuration
348
        :return: Dictionary of mapped outputs
349
        """
350
        outputs: Dict[str, Any] = {}
1✔
351
        for pipeline_output_path, wrapper_output_name in output_mapping.items():
1✔
352
            comp_name, socket_name = self._split_component_path(pipeline_output_path)
1✔
353
            if comp_name in pipeline_outputs and socket_name in pipeline_outputs[comp_name]:
1✔
354
                outputs[wrapper_output_name] = pipeline_outputs[comp_name][socket_name]
1✔
355
        return outputs
1✔
356

357
    def _to_super_component_dict(self) -> Dict[str, Any]:
1✔
358
        """
359
        Convert to a SuperComponent dictionary representation.
360

361
        :return: Dictionary containing serialized SuperComponent data
362
        """
363
        serialized_pipeline = self.pipeline.to_dict()
1✔
364
        serialized = default_to_dict(
1✔
365
            self,
366
            pipeline=serialized_pipeline,
367
            input_mapping=self._original_input_mapping,
368
            output_mapping=self._original_output_mapping,
369
        )
370
        serialized["type"] = generate_qualified_class_name(SuperComponent)
1✔
371
        return serialized
1✔
372

373

374
@component
1✔
375
class SuperComponent(_SuperComponent):
1✔
376
    """
377
    A class for creating super components that wrap around a Pipeline.
378

379
    This component allows for remapping of input and output socket names between the wrapped pipeline and the
380
    SuperComponent's input and output names. This is useful for creating higher-level components that abstract
381
    away the details of the wrapped pipeline.
382

383
    ### Usage example
384

385
    ```python
386
    from haystack import Pipeline, SuperComponent
387
    from haystack.components.generators.chat import OpenAIChatGenerator
388
    from haystack.components.builders import ChatPromptBuilder
389
    from haystack.components.retrievers import InMemoryBM25Retriever
390
    from haystack.dataclasses.chat_message import ChatMessage
391
    from haystack.document_stores.in_memory import InMemoryDocumentStore
392
    from haystack.dataclasses import Document
393

394
    document_store = InMemoryDocumentStore()
395
    documents = [
396
        Document(content="Paris is the capital of France."),
397
        Document(content="London is the capital of England."),
398
    ]
399
    document_store.write_documents(documents)
400

401
    prompt_template = [
402
        ChatMessage.from_user(
403
        '''
404
        According to the following documents:
405
        {% for document in documents %}
406
        {{document.content}}
407
        {% endfor %}
408
        Answer the given question: {{query}}
409
        Answer:
410
        '''
411
        )
412
    ]
413

414
    prompt_builder = ChatPromptBuilder(template=prompt_template, required_variables="*")
415

416
    pipeline = Pipeline()
417
    pipeline.add_component("retriever", InMemoryBM25Retriever(document_store=document_store))
418
    pipeline.add_component("prompt_builder", prompt_builder)
419
    pipeline.add_component("llm", OpenAIChatGenerator())
420
    pipeline.connect("retriever.documents", "prompt_builder.documents")
421
    pipeline.connect("prompt_builder.prompt", "llm.messages")
422

423
    # Create a super component with simplified input/output mapping
424
    wrapper = SuperComponent(
425
        pipeline=pipeline,
426
        input_mapping={
427
            "query": ["retriever.query", "prompt_builder.query"],
428
        },
429
        output_mapping={"llm.replies": "replies"}
430
    )
431

432
    # Run the pipeline with simplified interface
433
    result = wrapper.run(query="What is the capital of France?")
434
    print(result)
435
    {'replies': [ChatMessage(_role=<ChatRole.ASSISTANT: 'assistant'>,
436
     _content=[TextContent(text='The capital of France is Paris.')],...)
437
    ```
438

439
    """
440

441
    def to_dict(self) -> Dict[str, Any]:
1✔
442
        """
443
        Serializes the SuperComponent into a dictionary.
444

445
        :returns:
446
            Dictionary with serialized data.
447
        """
448
        return self._to_super_component_dict()
1✔
449

450
    @classmethod
1✔
451
    def from_dict(cls, data: Dict[str, Any]) -> "SuperComponent":
1✔
452
        """
453
        Deserializes the SuperComponent from a dictionary.
454

455
        :param data: The dictionary to deserialize from.
456
        :returns:
457
            The deserialized SuperComponent.
458
        """
459
        pipeline = Pipeline.from_dict(data["init_parameters"]["pipeline"])
1✔
460
        data["init_parameters"]["pipeline"] = pipeline
1✔
461
        return default_from_dict(cls, data)
1✔
462

463
    def show(self, server_url: str = "https://mermaid.ink", params: Optional[dict] = None, timeout: int = 30) -> None:
1✔
464
        """
465
        Display an image representing this SuperComponent's underlying pipeline in a Jupyter notebook.
466

467
        This function generates a diagram of the Pipeline using a Mermaid server and displays it directly in
468
        the notebook.
469

470
        :param server_url:
471
            The base URL of the Mermaid server used for rendering (default: 'https://mermaid.ink').
472
            See https://github.com/jihchi/mermaid.ink and https://github.com/mermaid-js/mermaid-live-editor for more
473
            info on how to set up your own Mermaid server.
474

475
        :param params:
476
            Dictionary of customization parameters to modify the output. Refer to Mermaid documentation for more details
477
            Supported keys:
478
                - format: Output format ('img', 'svg', or 'pdf'). Default: 'img'.
479
                - type: Image type for /img endpoint ('jpeg', 'png', 'webp'). Default: 'png'.
480
                - theme: Mermaid theme ('default', 'neutral', 'dark', 'forest'). Default: 'neutral'.
481
                - bgColor: Background color in hexadecimal (e.g., 'FFFFFF') or named format (e.g., '!white').
482
                - width: Width of the output image (integer).
483
                - height: Height of the output image (integer).
484
                - scale: Scaling factor (1–3). Only applicable if 'width' or 'height' is specified.
485
                - fit: Whether to fit the diagram size to the page (PDF only, boolean).
486
                - paper: Paper size for PDFs (e.g., 'a4', 'a3'). Ignored if 'fit' is true.
487
                - landscape: Landscape orientation for PDFs (boolean). Ignored if 'fit' is true.
488

489
        :param timeout:
490
            Timeout in seconds for the request to the Mermaid server.
491

492
        :raises PipelineDrawingError:
493
            If the function is called outside of a Jupyter notebook or if there is an issue with rendering.
494
        """
495
        self.pipeline.show(server_url=server_url, params=params, timeout=timeout)
1✔
496

497
    def draw(
1✔
498
        self, path: Path, server_url: str = "https://mermaid.ink", params: Optional[dict] = None, timeout: int = 30
499
    ) -> None:
500
        """
501
        Save an image representing this SuperComponent's underlying pipeline to the specified file path.
502

503
        This function generates a diagram of the Pipeline using the Mermaid server and saves it to the provided path.
504

505
        :param path:
506
            The file path where the generated image will be saved.
507
        :param server_url:
508
            The base URL of the Mermaid server used for rendering (default: 'https://mermaid.ink').
509
            See https://github.com/jihchi/mermaid.ink and https://github.com/mermaid-js/mermaid-live-editor for more
510
            info on how to set up your own Mermaid server.
511
        :param params:
512
            Dictionary of customization parameters to modify the output. Refer to Mermaid documentation for more details
513
            Supported keys:
514
                - format: Output format ('img', 'svg', or 'pdf'). Default: 'img'.
515
                - type: Image type for /img endpoint ('jpeg', 'png', 'webp'). Default: 'png'.
516
                - theme: Mermaid theme ('default', 'neutral', 'dark', 'forest'). Default: 'neutral'.
517
                - bgColor: Background color in hexadecimal (e.g., 'FFFFFF') or named format (e.g., '!white').
518
                - width: Width of the output image (integer).
519
                - height: Height of the output image (integer).
520
                - scale: Scaling factor (1–3). Only applicable if 'width' or 'height' is specified.
521
                - fit: Whether to fit the diagram size to the page (PDF only, boolean).
522
                - paper: Paper size for PDFs (e.g., 'a4', 'a3'). Ignored if 'fit' is true.
523
                - landscape: Landscape orientation for PDFs (boolean). Ignored if 'fit' is true.
524

525
        :param timeout:
526
            Timeout in seconds for the request to the Mermaid server.
527

528
        :raises PipelineDrawingError:
529
            If there is an issue with rendering or saving the image.
530
        """
531
        self.pipeline.draw(path=path, server_url=server_url, params=params, timeout=timeout)
1✔
532

533

534
def super_component(cls: Any):
1✔
535
    """
536
    Decorator that converts a class into a SuperComponent.
537

538
    This decorator:
539
    1. Creates a new class that inherits from SuperComponent
540
    2. Copies all methods and attributes from the original class
541
    3. Adds initialization logic to properly set up the SuperComponent
542

543
    The decorated class should define:
544
    - pipeline: A Pipeline or AsyncPipeline instance in the __init__ method
545
    - input_mapping: Dictionary mapping component inputs to pipeline inputs (optional)
546
    - output_mapping: Dictionary mapping pipeline outputs to component outputs (optional)
547
    """
548
    logger.debug("Registering {cls} as a super_component", cls=cls)
1✔
549

550
    # Store the original __init__ method
551
    original_init = cls.__init__
1✔
552

553
    # Create a new __init__ method that will initialize both the original class and SuperComponent
554
    def init_wrapper(self, *args, **kwargs):
1✔
555
        # Call the original __init__ to set up pipeline and mappings
556
        original_init(self, *args, **kwargs)
1✔
557

558
        # Verify required attributes
559
        if not hasattr(self, "pipeline"):
1✔
560
            raise ValueError(f"Class {cls.__name__} decorated with @super_component must define a 'pipeline' attribute")
×
561

562
        # Initialize SuperComponent
563
        _SuperComponent.__init__(
1✔
564
            self,
565
            pipeline=self.pipeline,
566
            input_mapping=getattr(self, "input_mapping", None),
567
            output_mapping=getattr(self, "output_mapping", None),
568
        )
569

570
    # Preserve original init's signature for IDEs/docs/tools
571
    init_wrapper = functools.wraps(original_init)(init_wrapper)
1✔
572

573
    # Function to copy namespace from the original class
574
    def copy_class_namespace(namespace):
1✔
575
        """Copy all attributes from the original class except special ones."""
576
        for key, val in dict(cls.__dict__).items():
1✔
577
            # Skip special attributes that should be recreated
578
            if key in ("__dict__", "__weakref__"):
1✔
579
                continue
1✔
580

581
            # Override __init__ with our wrapper
582
            if key == "__init__":
1✔
583
                namespace["__init__"] = init_wrapper
1✔
584
                continue
1✔
585

586
            namespace[key] = val
1✔
587

588
    # Create a new class inheriting from SuperComponent with the original methods
589
    # We use (SuperComponent,) + cls.__bases__ to make the new class inherit from
590
    # SuperComponent and all the original class's bases
591
    new_cls = new_class(cls.__name__, (_SuperComponent,) + cls.__bases__, {}, copy_class_namespace)
1✔
592

593
    # Copy other class attributes
594
    new_cls.__module__ = cls.__module__
1✔
595
    new_cls.__qualname__ = cls.__qualname__
1✔
596
    new_cls.__doc__ = cls.__doc__
1✔
597

598
    # Apply the component decorator to the new class
599
    return component(new_cls)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc