• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

IBM / unitxt / 15880205261

25 Jun 2025 03:12PM UTC coverage: 79.77% (+0.06%) from 79.708%
15880205261

push

github

web-flow
Improved error messages (#1838)

* initial

Signed-off-by: elronbandel <elronbandel@gmail.com>

* Improve error messages

Signed-off-by: elronbandel <elronbandel@gmail.com>

* Fif error

Signed-off-by: elronbandel <elronbandel@gmail.com>

* Fix ruff

Signed-off-by: elronbandel <elronbandel@gmail.com>

* Add more contextual error information

Signed-off-by: elronbandel <elronbandel@gmail.com>

* Fix all tests to pass

Signed-off-by: elronbandel <elronbandel@gmail.com>

* Fix some more tests

Signed-off-by: elronbandel <elronbandel@gmail.com>

* Fix another test

Signed-off-by: elronbandel <elronbandel@gmail.com>

* Fix some error and add contexts

Signed-off-by: elronbandel <elronbandel@gmail.com>

* FIx some tests

Signed-off-by: elronbandel <elronbandel@gmail.com>

* Update inference tests

Signed-off-by: elronbandel <elronbandel@gmail.com>

---------

Signed-off-by: elronbandel <elronbandel@gmail.com>
Co-authored-by: Yoav Katz <68273864+yoavkatz@users.noreply.github.com>

1722 of 2141 branches covered (80.43%)

Branch coverage included in aggregate %.

10699 of 13430 relevant lines covered (79.66%)

0.8 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.35
src/unitxt/operator.py
1
from abc import abstractmethod
1✔
2
from dataclasses import field
1✔
3
from typing import Any, Dict, Generator, List, Optional, Union
1✔
4

5
from pkg_resources import DistributionNotFound, VersionConflict, require
1✔
6

7
from .artifact import Artifact
1✔
8
from .dataclass import FinalField, InternalField, NonPositionalField
1✔
9
from .error_utils import error_context
1✔
10
from .settings_utils import get_constants
1✔
11
from .stream import DynamicStream, EmptyStreamError, MultiStream, Stream
1✔
12

13
constants = get_constants()
1✔
14

15

16
class Operator(Artifact):
1✔
17
    pass
1✔
18

19

20
class PackageRequirementsMixin(Artifact):
1✔
21
    """Base class used to automatically check for the existence of required Python dependencies for an artifact (e.g., Operator or Metric).
22

23
    The _requirements_list is either a list of required packages or a dictionary mapping required packages to installation instructions.
24
    The _requirements_list should be used at class level definition, and the requirements at instance creation.
25

26
    - **List format**: Just specify the package names, optionally with version annotations (e.g., ["torch>=1.2.4", "numpy<1.19"]).
27
    - **Dict format**: Specify package names as keys and installation instructions as values
28
      (e.g., {"torch>=1.2.4": "Install torch with `pip install torch>=1.2.4`"}).
29

30
    When a package version annotation is specified (like `torch>=1.2.4`), the `check_missing_requirements` method
31
    verifies that the installed version meets the specified constraint.
32
    """
33

34
    _requirements_list: Union[List[str], Dict[str, str]] = InternalField(
1✔
35
        default_factory=list
36
    )
37
    requirements: Union[List[str], Dict[str, str]] = FinalField(
1✔
38
        also_positional=False, default_factory=list
39
    )
40

41
    def prepare(self):
1✔
42
        self.check_missing_requirements(self._requirements_list)
1✔
43
        self.check_missing_requirements(self.requirements)
1✔
44
        super().prepare()
1✔
45

46
    def check_missing_requirements(self, requirements=None):
1✔
47
        if requirements is None:
1✔
48
            requirements = self._requirements_list
1✔
49
        if isinstance(requirements, list):
1✔
50
            requirements = {package: "" for package in requirements}
1✔
51

52
        missing_packages = []
1✔
53
        version_mismatched_packages = []
1✔
54
        installation_instructions = []
1✔
55

56
        for package, installation_instruction in requirements.items():
1✔
57
            try:
1✔
58
                # Use pkg_resources.require to verify the package requirement
59
                require(package)
1✔
60
            except DistributionNotFound:
1✔
61
                missing_packages.append(package)
1✔
62
                installation_instructions.append(
1✔
63
                    installation_instruction
64
                    or f"Install {package} with `pip install {package}`"
65
                )
66
            except VersionConflict as e:
1✔
67
                version_mismatched_packages.append(
1✔
68
                    f"{package} (installed: {e.dist.version}, required: {e.req})"
69
                )
70
                installation_instructions.append(
1✔
71
                    installation_instruction
72
                    or f"Update {package} to the required version with `pip install '{package}'`"
73
                )
74

75
        if missing_packages or version_mismatched_packages:
1✔
76
            raise MissingRequirementsError(
1✔
77
                self.__class__.__name__,
78
                missing_packages,
79
                version_mismatched_packages,
80
                installation_instructions,
81
            )
82

83

84
class MissingRequirementsError(Exception):
1✔
85
    def __init__(
1✔
86
        self,
87
        class_name,
88
        missing_packages,
89
        version_mismatched_packages,
90
        installation_instructions,
91
    ):
92
        self.class_name = class_name
1✔
93
        self.missing_packages = missing_packages
1✔
94
        self.version_mismatched_packages = version_mismatched_packages
1✔
95
        self.installation_instructions = installation_instructions
1✔
96

97
        missing_message = (
1✔
98
            f"Missing package(s): {', '.join(self.missing_packages)}."
99
            if self.missing_packages
100
            else ""
101
        )
102
        version_message = (
1✔
103
            f"Version mismatch(es): {', '.join(self.version_mismatched_packages)}."
104
            if self.version_mismatched_packages
105
            else ""
106
        )
107

108
        self.message = (
1✔
109
            f"{self.class_name} requires the following dependencies:\n"
110
            f"{missing_message}\n{version_message}\n"
111
            + "\n".join(self.installation_instructions)
112
        )
113
        super().__init__(self.message)
1✔
114

115

116
class OperatorError(Exception):
1✔
117
    def __init__(self, exception: Exception, operators: List[Operator]):
1✔
118
        super().__init__(
×
119
            "This error was raised by the following operators: "
120
            + ",\n".join([str(operator) for operator in operators])
121
            + "."
122
        )
123
        self.exception = exception
×
124
        self.operators = operators
×
125

126
    @classmethod
1✔
127
    def from_operator_error(cls, exception: Exception, operator: Operator):
1✔
128
        return cls(exception.exception, [*exception.operators, operator])
×
129

130
    @classmethod
1✔
131
    def from_exception(cls, exception: Exception, operator: Operator):
1✔
132
        return cls(exception, [operator])
×
133

134

135
class StreamingOperator(Operator, PackageRequirementsMixin):
1✔
136
    """Base class for all stream operators in the streaming model.
137

138
    Stream operators are a key component of the streaming model and are responsible for processing continuous data streams.
139
    They perform operations such as transformations, aggregations, joins, windowing and more on these streams.
140
    There are several types of stream operators, including source operators, processing operators, etc.
141

142
    As a `StreamingOperator`, this class is responsible for performing operations on a stream, and must be implemented by all other specific types of stream operators in the system.
143
    When called, a `StreamingOperator` must return a MultiStream.
144

145
    As a subclass of `Artifact`, every `StreamingOperator` can be saved in a catalog for further usage or reference.
146

147
    """
148

149
    @abstractmethod
1✔
150
    def __call__(self, streams: Optional[MultiStream] = None) -> MultiStream:
1✔
151
        """Abstract method that performs operations on the stream.
152

153
        Args:
154
            streams (Optional[MultiStream]): The input MultiStream, which can be None.
155

156
        Returns:
157
            MultiStream: The output MultiStream resulting from the operations performed on the input.
158
        """
159

160

161
class SideEffectOperator(StreamingOperator):
1✔
162
    """Base class for operators that does not affect the stream."""
163

164
    def __call__(self, streams: Optional[MultiStream] = None) -> MultiStream:
1✔
165
        self.process()
×
166
        return streams
×
167

168
    @abstractmethod
1✔
169
    def process() -> None:
1✔
170
        pass
×
171

172

173
def instance_generator(instance):
1✔
174
    yield instance
1✔
175

176

177
def stream_single(instance: Dict[str, Any]) -> Stream:
1✔
178
    return DynamicStream(
1✔
179
        generator=instance_generator, gen_kwargs={"instance": instance}
180
    )
181

182

183
class MultiStreamOperator(StreamingOperator):
1✔
184
    """A class representing a multi-stream operator in the streaming system.
185

186
    A multi-stream operator is a type of `StreamingOperator` that operates on an entire MultiStream object at once. It takes a `MultiStream` as input and produces a `MultiStream` as output. The `process` method should be implemented by subclasses to define the specific operations to be performed on the input `MultiStream`.
187
    """
188

189
    caching: bool = NonPositionalField(default=None)
1✔
190

191
    def __call__(
1✔
192
        self, multi_stream: Optional[MultiStream] = None, **instance: Dict[str, Any]
193
    ) -> Union[MultiStream, Dict[str, Any]]:
194
        self.before_process_multi_stream()
1✔
195
        if instance:
1✔
196
            if multi_stream is not None:
×
197
                return self.process_instance(instance)
×
198
        result = self._process_multi_stream(multi_stream)
1✔
199
        if self.caching is not None:
1✔
200
            result.set_caching(self.caching)
1✔
201
        return result
1✔
202

203
    def before_process_multi_stream(self):
1✔
204
        pass
1✔
205

206
    def _process_multi_stream(
1✔
207
        self, multi_stream: Optional[MultiStream] = None
208
    ) -> MultiStream:
209
        result = self.process(multi_stream)
1✔
210
        assert isinstance(
1✔
211
            result, MultiStream
212
        ), "MultiStreamOperator must return a MultiStream"
213
        return result
1✔
214

215
    @abstractmethod
1✔
216
    def process(self, multi_stream: MultiStream) -> MultiStream:
1✔
217
        pass
×
218

219
    def process_instance(self, instance, stream_name=constants.instance_stream):
1✔
220
        instance = self.verify_instance(instance)
1✔
221
        multi_stream = MultiStream({stream_name: stream_single(instance)})
1✔
222
        processed_multi_stream = self(multi_stream)
1✔
223
        return instance_result(processed_multi_stream[stream_name])
1✔
224

225

226
class SourceOperator(MultiStreamOperator):
1✔
227
    """A class representing a source operator in the streaming system.
228

229
    A source operator is responsible for generating the data stream from some source, such as a database or a file.
230
    This is the starting point of a stream processing pipeline.
231
    The ``SourceOperator`` class is a type of ``MultiStreamOperator``, which is a special type of ``StreamingOperator``
232
    that generates an output stream but does not take any input streams.
233

234
    When called, a ``SourceOperator`` invokes its ``process`` method, which should be implemented by all subclasses
235
    to generate the required ``MultiStream``.
236

237
    """
238

239
    def _process_multi_stream(
1✔
240
        self, multi_stream: Optional[MultiStream] = None
241
    ) -> MultiStream:
242
        result = self.process()
1✔
243
        assert isinstance(
1✔
244
            result, MultiStream
245
        ), "MultiStreamOperator must return a MultiStream"
246
        return result
1✔
247

248
    @abstractmethod
1✔
249
    def process(self) -> MultiStream:
1✔
250
        pass
×
251

252
    def get_splits(self):
1✔
253
        return list(self.process().keys())
×
254

255

256
class StreamInitializerOperator(SourceOperator):
1✔
257
    """A class representing a stream initializer operator in the streaming system.
258

259
    A stream initializer operator is a special type of ``SourceOperator`` that is capable
260
    of taking parameters during the stream generation process.
261
    This can be useful in situations where the stream generation process needs to be
262
    customized or configured based on certain parameters.
263

264
    When called, a ``StreamInitializerOperator`` invokes its ``process`` method, passing any supplied
265
    arguments and keyword arguments. The ``process`` method should be implemented by all subclasses
266
    to generate the required ``MultiStream`` based on the given arguments and keyword arguments.
267

268
    """
269

270
    caching: bool = NonPositionalField(default=None)
1✔
271

272
    def __call__(self, *args, **kwargs) -> MultiStream:
1✔
273
        multi_stream = self.process(*args, **kwargs)
1✔
274
        if self.caching is not None:
1✔
275
            multi_stream.set_caching(self.caching)
×
276
        return self.process(*args, **kwargs)
1✔
277

278
    @abstractmethod
1✔
279
    def process(self, *args, **kwargs) -> MultiStream:
1✔
280
        pass
×
281

282

283
def instance_result(result_stream):
1✔
284
    result = list(result_stream)
1✔
285
    if len(result) == 0:
1✔
286
        return None
×
287
    if len(result) == 1:
1✔
288
        return result[0]
1✔
289
    return result
×
290

291

292
class StreamOperator(MultiStreamOperator):
1✔
293
    """A class representing a single-stream operator in the streaming system.
294

295
    A single-stream operator is a type of ``MultiStreamOperator`` that operates on individual
296
    ``Stream`` objects within a ``MultiStream``. It iterates through each ``Stream`` in the ``MultiStream``
297
    and applies the ``process`` method.
298

299
    The ``process`` method should be implemented by subclasses to define the specific operations
300
    to be performed on each ``Stream``.
301

302
    """
303

304
    apply_to_streams: List[str] = NonPositionalField(
1✔
305
        default=None
306
    )  # None apply to all streams
307
    dont_apply_to_streams: List[str] = NonPositionalField(default=None)
1✔
308

309
    def _process_multi_stream(self, multi_stream: MultiStream) -> MultiStream:
1✔
310
        result = {}
1✔
311
        for stream_name, stream in multi_stream.items():
1✔
312
            if self._is_should_be_processed(stream_name):
1✔
313
                stream = self._process_single_stream(stream, stream_name)
1✔
314
            else:
315
                stream = stream
1✔
316
            assert isinstance(stream, Stream), "StreamOperator must return a Stream"
1✔
317
            result[stream_name] = stream
1✔
318

319
        return MultiStream(result)
1✔
320

321
    def _process_single_stream(
1✔
322
        self, stream: Stream, stream_name: Optional[str] = None
323
    ) -> Stream:
324
        return DynamicStream(
1✔
325
            self._process_stream,
326
            gen_kwargs={"stream": stream, "stream_name": stream_name},
327
        )
328

329
    def _is_should_be_processed(self, stream_name):
1✔
330
        if (
1✔
331
            self.apply_to_streams is not None
332
            and self.dont_apply_to_streams is not None
333
            and stream_name in self.apply_to_streams
334
            and stream_name in self.dont_apply_to_streams
335
        ):
336
            raise ValueError(
×
337
                f"Stream '{stream_name}' can be in either apply_to_streams or dont_apply_to_streams not both."
338
            )
339

340
        return (
1✔
341
            self.apply_to_streams is None or stream_name in self.apply_to_streams
342
        ) and (
343
            self.dont_apply_to_streams is None
344
            or stream_name not in self.dont_apply_to_streams
345
        )
346

347
    def _process_stream(
1✔
348
        self, stream: Stream, stream_name: Optional[str] = None
349
    ) -> Generator:
350
        with error_context(self, stream=stream_name):
1✔
351
            yield from self.process(stream, stream_name)
1✔
352

353
    @abstractmethod
1✔
354
    def process(self, stream: Stream, stream_name: Optional[str] = None) -> Generator:
1✔
355
        pass
×
356

357
    def process_instance(self, instance, stream_name=constants.instance_stream):
1✔
358
        instance = self.verify_instance(instance)
×
359
        processed_stream = self._process_single_stream(
×
360
            stream_single(instance), stream_name
361
        )
362
        return instance_result(processed_stream)
×
363

364

365
class SingleStreamOperator(StreamOperator):
1✔
366
    pass
1✔
367

368

369
class PagedStreamOperator(StreamOperator):
1✔
370
    """A class representing a paged-stream operator in the streaming system.
371

372
    A paged-stream operator is a type of ``StreamOperator`` that operates on a page of instances
373
    in a ``Stream`` at a time, where a page is a subset of instances.
374
    The ``process`` method should be implemented by subclasses to define the specific operations
375
    to be performed on each page.
376

377
    Args:
378
        page_size (int):
379
            The size of each page in the stream. Defaults to 1000.
380

381
    """
382

383
    page_size: int = 1000
1✔
384

385
    def _process_stream(
1✔
386
        self, stream: Stream, stream_name: Optional[str] = None
387
    ) -> Generator:
388
        page = []
1✔
389
        page_number = 0
1✔
390
        for instance in stream:
1✔
391
            page.append(instance)
1✔
392
            if len(page) >= self.page_size:
1✔
393
                with error_context(
1✔
394
                    self,
395
                    stream=stream_name,
396
                    page=page_number,
397
                    page_size=len(page),
398
                ):
399
                    yield from self.process(page, stream_name)
1✔
400
                page = []
1✔
401
                page_number += 1
1✔
402
        if page:  # Handle any remaining instances in the last partial page
1✔
403
            with error_context(
1✔
404
                self,
405
                stream=stream_name,
406
                page=page_number,
407
                page_size=len(page),
408
                final_page=True,
409
            ):
410
                yield from self._process_page(page, stream_name)
1✔
411

412
    def _process_page(
1✔
413
        self, page: List[Dict], stream_name: Optional[str] = None
414
    ) -> Generator:
415
        yield from self.process(page, stream_name)
1✔
416

417
    @abstractmethod
1✔
418
    def process(self, page: List[Dict], stream_name: Optional[str] = None) -> Generator:
1✔
419
        pass
×
420

421
    def process_instance(self, instance, stream_name=constants.instance_stream):
1✔
422
        instance = self.verify_instance(instance)
×
423
        processed_stream = self._process_page([instance], stream_name)
×
424
        return instance_result(processed_stream)
×
425

426

427
class SingleStreamReducer(StreamingOperator):
1✔
428
    """A class representing a single-stream reducer in the streaming system.
429

430
    A single-stream reducer is a type of ``StreamingOperator`` that operates on individual
431
    ``Stream`` objects within a ``MultiStream`` and reduces each ``Stream`` to a single output value.
432

433
    The ``process`` method should be implemented by subclasses to define the specific reduction operation
434
    to be performed on each ``Stream``.
435

436
    """
437

438
    def __call__(self, multi_stream: Optional[MultiStream] = None) -> Dict[str, Any]:
1✔
439
        result = {}
×
440
        for stream_name, stream in multi_stream.items():
×
441
            stream = self.process(stream)
×
442
            result[stream_name] = stream
×
443

444
        return result
×
445

446
    @abstractmethod
1✔
447
    def process(self, stream: Stream) -> Stream:
1✔
448
        pass
×
449

450

451
class InstanceOperator(StreamOperator):
1✔
452
    """A class representing a stream instance operator in the streaming system.
453

454
    A stream instance operator is a type of ``StreamOperator`` that operates on individual instances
455
    within a ``Stream``. It iterates through each instance in the ``Stream`` and applies the ``process`` method.
456
    The ``process`` method should be implemented by subclasses to define the specific operations
457
    to be performed on each instance.
458
    """
459

460
    def _process_stream(
1✔
461
        self, stream: Stream, stream_name: Optional[str] = None
462
    ) -> Generator:
463
        for _index, instance in enumerate(stream):
1✔
464
            with error_context(self, stream=stream_name, instance=_index):
1✔
465
                yield self._process_instance(instance, stream_name)
1✔
466

467
    def _process_instance(
1✔
468
        self, instance: Dict[str, Any], stream_name: Optional[str] = None
469
    ) -> Dict[str, Any]:
470
        instance = self.verify_instance(instance)
1✔
471
        return self.process(instance, stream_name)
1✔
472

473
    @abstractmethod
1✔
474
    def process(
1✔
475
        self, instance: Dict[str, Any], stream_name: Optional[str] = None
476
    ) -> Dict[str, Any]:
477
        pass
×
478

479
    def process_instance(self, instance, stream_name=constants.instance_stream):
1✔
480
        return self._process_instance(instance, stream_name)
1✔
481

482

483
class InstanceOperatorValidator(InstanceOperator):
1✔
484
    """A class representing a stream instance operator validator in the streaming system.
485

486
    A stream instance operator validator is a type of ``InstanceOperator`` that includes a validation step.
487
    It operates on individual instances within a ``Stream`` and validates the result of processing each instance.
488
    """
489

490
    @abstractmethod
1✔
491
    def validate(self, instance):
1✔
492
        pass
×
493

494
    def _process_stream(
1✔
495
        self, stream: Stream, stream_name: Optional[str] = None
496
    ) -> Generator:
497
        iterator = iter(stream)
1✔
498
        try:
1✔
499
            first_instance = next(iterator)
1✔
500
        except StopIteration as e:
×
501
            raise EmptyStreamError(f"Stream '{stream_name}' is empty") from e
×
502
        result = self._process_instance(first_instance, stream_name)
1✔
503
        self.validate(result, stream_name)
1✔
504
        yield result
1✔
505
        yield from (
1✔
506
            self._process_instance(instance, stream_name) for instance in iterator
507
        )
508

509

510
class InstanceOperatorWithMultiStreamAccess(StreamingOperator):
1✔
511
    """A class representing an instance operator with global access in the streaming system.
512

513
    An instance operator with global access is a type of `StreamingOperator` that operates on individual instances within a `Stream` and can also access other streams.
514
    It uses the `accessible_streams` attribute to determine which other streams it has access to.
515
    In order to make this efficient and to avoid qudratic complexity, it caches the accessible streams by default.
516
    """
517

518
    def __call__(
1✔
519
        self, multi_stream: Optional[MultiStream] = None, **instance: Dict[str, Any]
520
    ) -> MultiStream:
521
        if instance:
×
522
            raise NotImplementedError("Instance mode is not supported")
×
523

524
        result = {}
×
525

526
        for stream_name, stream in multi_stream.items():
×
527
            stream = DynamicStream(
×
528
                self.generator,
529
                gen_kwargs={"stream": stream, "multi_stream": multi_stream},
530
            )
531
            result[stream_name] = stream
×
532

533
        return MultiStream(result)
×
534

535
    def generator(self, stream, multi_stream):
1✔
536
        yield from (
×
537
            self.process(self.verify_instance(instance), multi_stream)
538
            for instance in stream
539
        )
540

541
    @abstractmethod
1✔
542
    def process(self, instance: dict, multi_stream: MultiStream) -> dict:
1✔
543
        pass
×
544

545

546
class SequentialMixin(Artifact):
1✔
547
    max_steps: Optional[int] = None
1✔
548
    steps: List[StreamingOperator] = field(default_factory=list)
1✔
549

550
    def num_steps(self) -> int:
1✔
551
        return len(self.steps)
1✔
552

553
    def set_max_steps(self, max_steps):
1✔
554
        assert (
1✔
555
            max_steps <= self.num_steps()
556
        ), f"Max steps requested ({max_steps}) is larger than defined steps {self.num_steps()}"
557
        assert max_steps >= 1, f"Max steps requested ({max_steps}) is less than 1"
1✔
558
        self.max_steps = max_steps
1✔
559

560
    def get_last_step_description(self):
1✔
561
        last_step = (
×
562
            self.max_steps - 1 if self.max_steps is not None else len(self.steps) - 1
563
        )
564
        return self.steps[last_step].__description__
×
565

566
    def _get_max_steps(self):
1✔
567
        return self.max_steps if self.max_steps is not None else len(self.steps)
1✔
568

569

570
class SequentialOperator(MultiStreamOperator, SequentialMixin):
1✔
571
    """A class representing a sequential operator in the streaming system.
572

573
    A sequential operator is a type of `MultiStreamOperator` that applies a sequence of other operators to a
574
    `MultiStream`. It maintains a list of `StreamingOperator`s and applies them in order to the `MultiStream`.
575
    """
576

577
    def process(self, multi_stream: Optional[MultiStream] = None) -> MultiStream:
1✔
578
        for operator in self.steps[0 : self._get_max_steps()]:
1✔
579
            multi_stream = operator(multi_stream)
1✔
580
        return multi_stream
1✔
581

582

583
class SourceSequentialOperator(SourceOperator, SequentialMixin):
1✔
584
    """A class representing a source sequential operator in the streaming system.
585

586
    A source sequential operator is a type of `SequentialOperator` that starts with a source operator.
587
    The first operator in its list of steps is a `SourceOperator`, which generates the initial `MultiStream`
588
    that the other operators then process.
589
    """
590

591
    def process(self, multi_stream: Optional[MultiStream] = None) -> MultiStream:
1✔
592
        assert (
1✔
593
            self.num_steps() > 0
594
        ), "Calling process on a SourceSequentialOperator without any steps"
595
        multi_stream = self.steps[0]()
1✔
596
        for operator in self.steps[1 : self._get_max_steps()]:
1✔
597
            multi_stream = operator(multi_stream)
1✔
598
        return multi_stream
1✔
599

600

601
class SequentialOperatorInitializer(SequentialOperator):
1✔
602
    """A class representing a sequential operator initializer in the streaming system.
603

604
    A sequential operator initializer is a type of `SequntialOperator` that starts with a stream initializer operator. The first operator in its list of steps is a `StreamInitializerOperator`, which generates the initial `MultiStream` based on the provided arguments and keyword arguments.
605
    """
606

607
    def __call__(self, *args, **kwargs) -> MultiStream:
1✔
608
        return self.process(*args, **kwargs)
1✔
609

610
    def process(self, *args, **kwargs) -> MultiStream:
1✔
611
        assert (
1✔
612
            self.num_steps() > 0
613
        ), "Calling process on a SequentialOperatorInitializer without any steps"
614

615
        assert isinstance(
1✔
616
            self.steps[0], StreamInitializerOperator
617
        ), "The first step in a SequentialOperatorInitializer must be a StreamInitializerOperator"
618
        multi_stream = self.steps[0](*args, **kwargs)
1✔
619
        for operator in self.steps[1 : self._get_max_steps()]:
1✔
620
            multi_stream = operator(multi_stream)
1✔
621
        return multi_stream
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc