• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

IBM / unitxt / 16719557902

04 Aug 2025 09:38AM UTC coverage: 81.089% (-0.1%) from 81.22%
16719557902

Pull #1861

github

web-flow
Merge 36770247c into 332165f92
Pull Request #1861: Fix compatibility with datasets 4.0

1591 of 1972 branches covered (80.68%)

Branch coverage included in aggregate %.

10754 of 13252 relevant lines covered (81.15%)

0.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.4
src/unitxt/operator.py
1
from abc import abstractmethod
1✔
2
from dataclasses import field
1✔
3
from typing import Any, Dict, Generator, List, Optional, Union
1✔
4

5
from .artifact import Artifact
1✔
6
from .dataclass import FinalField, InternalField, NonPositionalField
1✔
7
from .error_utils import error_context
1✔
8
from .settings_utils import get_constants
1✔
9
from .stream import DynamicStream, EmptyStreamError, MultiStream, Stream
1✔
10
from .utils import DistributionNotFound, VersionConflict, require
1✔
11

12
constants = get_constants()
1✔
13

14

15
class Operator(Artifact):
1✔
16
    pass
17

18

19
class PackageRequirementsMixin(Artifact):
1✔
20
    """Base class used to automatically check for the existence of required Python dependencies for an artifact (e.g., Operator or Metric).
21

22
    The _requirements_list is either a list of required packages or a dictionary mapping required packages to installation instructions.
23
    The _requirements_list should be used at class level definition, and the requirements at instance creation.
24

25
    - **List format**: Just specify the package names, optionally with version annotations (e.g., ["torch>=1.2.4", "numpy<1.19"]).
26
    - **Dict format**: Specify package names as keys and installation instructions as values
27
      (e.g., {"torch>=1.2.4": "Install torch with `pip install torch>=1.2.4`"}).
28

29
    When a package version annotation is specified (like `torch>=1.2.4`), the `check_missing_requirements` method
30
    verifies that the installed version meets the specified constraint.
31
    """
32

33
    _requirements_list: Union[List[str], Dict[str, str]] = InternalField(
1✔
34
        default_factory=list
35
    )
36
    requirements: Union[List[str], Dict[str, str]] = FinalField(
1✔
37
        also_positional=False, default_factory=list
38
    )
39

40
    def prepare(self):
1✔
41
        self.check_missing_requirements(self._requirements_list)
1✔
42
        self.check_missing_requirements(self.requirements)
1✔
43
        super().prepare()
1✔
44

45
    def check_missing_requirements(self, requirements=None):
1✔
46
        if requirements is None:
1✔
47
            requirements = self._requirements_list
1✔
48
        if isinstance(requirements, list):
1✔
49
            requirements = {package: "" for package in requirements}
1✔
50

51
        missing_packages = []
1✔
52
        version_mismatched_packages = []
1✔
53
        installation_instructions = []
1✔
54

55
        for package, installation_instruction in requirements.items():
1✔
56
            try:
1✔
57
                # Use pkg_resources.require to verify the package requirement
58
                require(package)
1✔
59
            except DistributionNotFound:
1✔
60
                missing_packages.append(package)
1✔
61
                installation_instructions.append(
1✔
62
                    installation_instruction
63
                    or f"Install {package} with `pip install {package}`"
64
                )
65
            except VersionConflict as e:
1✔
66
                version_mismatched_packages.append(
1✔
67
                    f"{package} (installed: {e.dist.version}, required: {e.req})"
68
                )
69
                installation_instructions.append(
1✔
70
                    installation_instruction
71
                    or f"Update {package} to the required version with `pip install '{package}'`"
72
                )
73

74
        if missing_packages or version_mismatched_packages:
1✔
75
            raise MissingRequirementsError(
1✔
76
                self.__class__.__name__,
77
                missing_packages,
78
                version_mismatched_packages,
79
                installation_instructions,
80
            )
81

82

83
class MissingRequirementsError(Exception):
1✔
84
    def __init__(
1✔
85
        self,
86
        class_name,
87
        missing_packages,
88
        version_mismatched_packages,
89
        installation_instructions,
90
    ):
91
        self.class_name = class_name
1✔
92
        self.missing_packages = missing_packages
1✔
93
        self.version_mismatched_packages = version_mismatched_packages
1✔
94
        self.installation_instructions = installation_instructions
1✔
95

96
        missing_message = (
1✔
97
            f"Missing package(s): {', '.join(self.missing_packages)}."
98
            if self.missing_packages
99
            else ""
100
        )
101
        version_message = (
1✔
102
            f"Version mismatch(es): {', '.join(self.version_mismatched_packages)}."
103
            if self.version_mismatched_packages
104
            else ""
105
        )
106

107
        self.message = (
1✔
108
            f"{self.class_name} requires the following dependencies:\n"
109
            f"{missing_message}\n{version_message}\n"
110
            + "\n".join(self.installation_instructions)
111
        )
112
        super().__init__(self.message)
1✔
113

114

115
class OperatorError(Exception):
1✔
116
    def __init__(self, exception: Exception, operators: List[Operator]):
1✔
117
        super().__init__(
×
118
            "This error was raised by the following operators: "
119
            + ",\n".join([str(operator) for operator in operators])
120
            + "."
121
        )
122
        self.exception = exception
×
123
        self.operators = operators
×
124

125
    @classmethod
1✔
126
    def from_operator_error(cls, exception: Exception, operator: Operator):
1✔
127
        return cls(exception.exception, [*exception.operators, operator])
×
128

129
    @classmethod
1✔
130
    def from_exception(cls, exception: Exception, operator: Operator):
1✔
131
        return cls(exception, [operator])
×
132

133

134
class StreamingOperator(Operator, PackageRequirementsMixin):
1✔
135
    """Base class for all stream operators in the streaming model.
136

137
    Stream operators are a key component of the streaming model and are responsible for processing continuous data streams.
138
    They perform operations such as transformations, aggregations, joins, windowing and more on these streams.
139
    There are several types of stream operators, including source operators, processing operators, etc.
140

141
    As a `StreamingOperator`, this class is responsible for performing operations on a stream, and must be implemented by all other specific types of stream operators in the system.
142
    When called, a `StreamingOperator` must return a MultiStream.
143

144
    As a subclass of `Artifact`, every `StreamingOperator` can be saved in a catalog for further usage or reference.
145

146
    """
147

148
    @abstractmethod
1✔
149
    def __call__(self, streams: Optional[MultiStream] = None) -> MultiStream:
1✔
150
        """Abstract method that performs operations on the stream.
151

152
        Args:
153
            streams (Optional[MultiStream]): The input MultiStream, which can be None.
154

155
        Returns:
156
            MultiStream: The output MultiStream resulting from the operations performed on the input.
157
        """
158

159

160
class SideEffectOperator(StreamingOperator):
1✔
161
    """Base class for operators that does not affect the stream."""
162

163
    def __call__(self, streams: Optional[MultiStream] = None) -> MultiStream:
1✔
164
        self.process()
×
165
        return streams
×
166

167
    @abstractmethod
1✔
168
    def process() -> None:
1✔
169
        pass
170

171

172
def instance_generator(instance):
1✔
173
    yield instance
1✔
174

175

176
def stream_single(instance: Dict[str, Any]) -> Stream:
1✔
177
    return DynamicStream(
1✔
178
        generator=instance_generator, gen_kwargs={"instance": instance}
179
    )
180

181

182
class MultiStreamOperator(StreamingOperator):
1✔
183
    """A class representing a multi-stream operator in the streaming system.
184

185
    A multi-stream operator is a type of `StreamingOperator` that operates on an entire MultiStream object at once. It takes a `MultiStream` as input and produces a `MultiStream` as output. The `process` method should be implemented by subclasses to define the specific operations to be performed on the input `MultiStream`.
186
    """
187

188
    caching: bool = NonPositionalField(default=None)
1✔
189

190
    def __call__(
1✔
191
        self, multi_stream: Optional[MultiStream] = None, **instance: Dict[str, Any]
192
    ) -> Union[MultiStream, Dict[str, Any]]:
193
        self.before_process_multi_stream()
1✔
194
        if instance:
1✔
195
            if multi_stream is not None:
×
196
                return self.process_instance(instance)
×
197
        result = self._process_multi_stream(multi_stream)
1✔
198
        if self.caching is not None:
1✔
199
            result.set_caching(self.caching)
1✔
200
        return result
1✔
201

202
    def before_process_multi_stream(self):
1✔
203
        pass
204

205
    def _process_multi_stream(
1✔
206
        self, multi_stream: Optional[MultiStream] = None
207
    ) -> MultiStream:
208
        result = self.process(multi_stream)
1✔
209
        assert isinstance(
1✔
210
            result, MultiStream
211
        ), "MultiStreamOperator must return a MultiStream"
212
        return result
1✔
213

214
    @abstractmethod
1✔
215
    def process(self, multi_stream: MultiStream) -> MultiStream:
1✔
216
        pass
217

218
    def process_instance(self, instance, stream_name=constants.instance_stream):
1✔
219
        instance = self.verify_instance(instance)
1✔
220
        multi_stream = MultiStream({stream_name: stream_single(instance)})
1✔
221
        processed_multi_stream = self(multi_stream)
1✔
222
        return instance_result(processed_multi_stream[stream_name])
1✔
223

224

225
class SourceOperator(MultiStreamOperator):
1✔
226
    """A class representing a source operator in the streaming system.
227

228
    A source operator is responsible for generating the data stream from some source, such as a database or a file.
229
    This is the starting point of a stream processing pipeline.
230
    The ``SourceOperator`` class is a type of ``MultiStreamOperator``, which is a special type of ``StreamingOperator``
231
    that generates an output stream but does not take any input streams.
232

233
    When called, a ``SourceOperator`` invokes its ``process`` method, which should be implemented by all subclasses
234
    to generate the required ``MultiStream``.
235

236
    """
237

238
    def _process_multi_stream(
1✔
239
        self, multi_stream: Optional[MultiStream] = None
240
    ) -> MultiStream:
241
        result = self.process()
1✔
242
        assert isinstance(
1✔
243
            result, MultiStream
244
        ), "MultiStreamOperator must return a MultiStream"
245
        return result
1✔
246

247
    @abstractmethod
1✔
248
    def process(self) -> MultiStream:
1✔
249
        pass
250

251
    def get_splits(self):
1✔
252
        return list(self.process().keys())
×
253

254

255
class StreamInitializerOperator(SourceOperator):
1✔
256
    """A class representing a stream initializer operator in the streaming system.
257

258
    A stream initializer operator is a special type of ``SourceOperator`` that is capable
259
    of taking parameters during the stream generation process.
260
    This can be useful in situations where the stream generation process needs to be
261
    customized or configured based on certain parameters.
262

263
    When called, a ``StreamInitializerOperator`` invokes its ``process`` method, passing any supplied
264
    arguments and keyword arguments. The ``process`` method should be implemented by all subclasses
265
    to generate the required ``MultiStream`` based on the given arguments and keyword arguments.
266

267
    """
268

269
    caching: bool = NonPositionalField(default=None)
1✔
270

271
    def __call__(self, *args, **kwargs) -> MultiStream:
1✔
272
        multi_stream = self.process(*args, **kwargs)
1✔
273
        if self.caching is not None:
1✔
274
            multi_stream.set_caching(self.caching)
×
275
        return self.process(*args, **kwargs)
1✔
276

277
    @abstractmethod
1✔
278
    def process(self, *args, **kwargs) -> MultiStream:
1✔
279
        pass
280

281

282
def instance_result(result_stream):
1✔
283
    result = list(result_stream)
1✔
284
    if len(result) == 0:
1✔
285
        return None
×
286
    if len(result) == 1:
1✔
287
        return result[0]
1✔
288
    return result
×
289

290

291
class StreamOperator(MultiStreamOperator):
1✔
292
    """A class representing a single-stream operator in the streaming system.
293

294
    A single-stream operator is a type of ``MultiStreamOperator`` that operates on individual
295
    ``Stream`` objects within a ``MultiStream``. It iterates through each ``Stream`` in the ``MultiStream``
296
    and applies the ``process`` method.
297

298
    The ``process`` method should be implemented by subclasses to define the specific operations
299
    to be performed on each ``Stream``.
300

301
    """
302

303
    apply_to_streams: List[str] = NonPositionalField(
1✔
304
        default=None
305
    )  # None apply to all streams
306
    dont_apply_to_streams: List[str] = NonPositionalField(default=None)
1✔
307

308
    def _process_multi_stream(self, multi_stream: MultiStream) -> MultiStream:
1✔
309
        result = {}
1✔
310
        for stream_name, stream in multi_stream.items():
1✔
311
            if self._is_should_be_processed(stream_name):
1✔
312
                stream = self._process_single_stream(stream, stream_name)
1✔
313
            else:
314
                stream = stream
1✔
315
            assert isinstance(stream, Stream), "StreamOperator must return a Stream"
1✔
316
            result[stream_name] = stream
1✔
317

318
        return MultiStream(result)
1✔
319

320
    def _process_single_stream(
1✔
321
        self, stream: Stream, stream_name: Optional[str] = None
322
    ) -> Stream:
323
        return DynamicStream(
1✔
324
            self._process_stream,
325
            gen_kwargs={"stream": stream, "stream_name": stream_name},
326
        )
327

328
    def _is_should_be_processed(self, stream_name):
1✔
329
        if (
1✔
330
            self.apply_to_streams is not None
331
            and self.dont_apply_to_streams is not None
332
            and stream_name in self.apply_to_streams
333
            and stream_name in self.dont_apply_to_streams
334
        ):
335
            raise ValueError(
336
                f"Stream '{stream_name}' can be in either apply_to_streams or dont_apply_to_streams not both."
337
            )
338

339
        return (
1✔
340
            self.apply_to_streams is None or stream_name in self.apply_to_streams
341
        ) and (
342
            self.dont_apply_to_streams is None
343
            or stream_name not in self.dont_apply_to_streams
344
        )
345

346
    def _process_stream(
1✔
347
        self, stream: Stream, stream_name: Optional[str] = None
348
    ) -> Generator:
349
        with error_context(self, stream=stream_name):
1✔
350
            yield from self.process(stream, stream_name)
1✔
351

352
    @abstractmethod
1✔
353
    def process(self, stream: Stream, stream_name: Optional[str] = None) -> Generator:
1✔
354
        pass
355

356
    def process_instance(self, instance, stream_name=constants.instance_stream):
1✔
357
        instance = self.verify_instance(instance)
×
358
        processed_stream = self._process_single_stream(
×
359
            stream_single(instance), stream_name
360
        )
361
        return instance_result(processed_stream)
×
362

363

364
class SingleStreamOperator(StreamOperator):
1✔
365
    pass
366

367

368
class PagedStreamOperator(StreamOperator):
1✔
369
    """A class representing a paged-stream operator in the streaming system.
370

371
    A paged-stream operator is a type of ``StreamOperator`` that operates on a page of instances
372
    in a ``Stream`` at a time, where a page is a subset of instances.
373
    The ``process`` method should be implemented by subclasses to define the specific operations
374
    to be performed on each page.
375

376
    Args:
377
        page_size (int):
378
            The size of each page in the stream. Defaults to 1000.
379

380
    """
381

382
    page_size: int = 1000
1✔
383

384
    def _process_stream(
1✔
385
        self, stream: Stream, stream_name: Optional[str] = None
386
    ) -> Generator:
387
        page = []
1✔
388
        page_number = 0
1✔
389
        for instance in stream:
1✔
390
            page.append(instance)
1✔
391
            if len(page) >= self.page_size:
1✔
392
                with error_context(
1✔
393
                    self,
394
                    stream=stream_name,
395
                    page=page_number,
396
                    page_size=len(page),
397
                ):
398
                    yield from self.process(page, stream_name)
1✔
399
                page = []
1✔
400
                page_number += 1
1✔
401
        if page:  # Handle any remaining instances in the last partial page
1✔
402
            with error_context(
1✔
403
                self,
404
                stream=stream_name,
405
                page=page_number,
406
                page_size=len(page),
407
                final_page=True,
408
            ):
409
                yield from self._process_page(page, stream_name)
1✔
410

411
    def _process_page(
1✔
412
        self, page: List[Dict], stream_name: Optional[str] = None
413
    ) -> Generator:
414
        yield from self.process(page, stream_name)
1✔
415

416
    @abstractmethod
1✔
417
    def process(self, page: List[Dict], stream_name: Optional[str] = None) -> Generator:
1✔
418
        pass
419

420
    def process_instance(self, instance, stream_name=constants.instance_stream):
1✔
421
        instance = self.verify_instance(instance)
×
422
        processed_stream = self._process_page([instance], stream_name)
×
423
        return instance_result(processed_stream)
×
424

425

426
class SingleStreamReducer(StreamingOperator):
1✔
427
    """A class representing a single-stream reducer in the streaming system.
428

429
    A single-stream reducer is a type of ``StreamingOperator`` that operates on individual
430
    ``Stream`` objects within a ``MultiStream`` and reduces each ``Stream`` to a single output value.
431

432
    The ``process`` method should be implemented by subclasses to define the specific reduction operation
433
    to be performed on each ``Stream``.
434

435
    """
436

437
    def __call__(self, multi_stream: Optional[MultiStream] = None) -> Dict[str, Any]:
1✔
438
        result = {}
×
439
        for stream_name, stream in multi_stream.items():
×
440
            stream = self.process(stream)
×
441
            result[stream_name] = stream
×
442

443
        return result
×
444

445
    @abstractmethod
1✔
446
    def process(self, stream: Stream) -> Stream:
1✔
447
        pass
448

449

450
class InstanceOperator(StreamOperator):
1✔
451
    """A class representing a stream instance operator in the streaming system.
452

453
    A stream instance operator is a type of ``StreamOperator`` that operates on individual instances
454
    within a ``Stream``. It iterates through each instance in the ``Stream`` and applies the ``process`` method.
455
    The ``process`` method should be implemented by subclasses to define the specific operations
456
    to be performed on each instance.
457
    """
458

459
    def _process_stream(
1✔
460
        self, stream: Stream, stream_name: Optional[str] = None
461
    ) -> Generator:
462
        for _index, instance in enumerate(stream):
1✔
463
            with error_context(self, stream=stream_name, instance=_index):
1✔
464
                yield self._process_instance(instance, stream_name)
1✔
465

466
    def _process_instance(
1✔
467
        self, instance: Dict[str, Any], stream_name: Optional[str] = None
468
    ) -> Dict[str, Any]:
469
        instance = self.verify_instance(instance)
1✔
470
        return self.process(instance, stream_name)
1✔
471

472
    @abstractmethod
1✔
473
    def process(
1✔
474
        self, instance: Dict[str, Any], stream_name: Optional[str] = None
475
    ) -> Dict[str, Any]:
476
        pass
477

478
    def process_instance(self, instance, stream_name=constants.instance_stream):
1✔
479
        return self._process_instance(instance, stream_name)
1✔
480

481

482
class InstanceOperatorValidator(InstanceOperator):
1✔
483
    """A class representing a stream instance operator validator in the streaming system.
484

485
    A stream instance operator validator is a type of ``InstanceOperator`` that includes a validation step.
486
    It operates on individual instances within a ``Stream`` and validates the result of processing each instance.
487
    """
488

489
    @abstractmethod
1✔
490
    def validate(self, instance):
1✔
491
        pass
492

493
    def _process_stream(
1✔
494
        self, stream: Stream, stream_name: Optional[str] = None
495
    ) -> Generator:
496
        iterator = iter(stream)
1✔
497
        try:
1✔
498
            first_instance = next(iterator)
1✔
499
        except StopIteration as e:
×
500
            raise EmptyStreamError(f"Stream '{stream_name}' is empty") from e
×
501
        result = self._process_instance(first_instance, stream_name)
1✔
502
        self.validate(result, stream_name)
1✔
503
        yield result
1✔
504
        yield from (
1✔
505
            self._process_instance(instance, stream_name) for instance in iterator
506
        )
507

508

509
class InstanceOperatorWithMultiStreamAccess(StreamingOperator):
1✔
510
    """A class representing an instance operator with global access in the streaming system.
511

512
    An instance operator with global access is a type of `StreamingOperator` that operates on individual instances within a `Stream` and can also access other streams.
513
    It uses the `accessible_streams` attribute to determine which other streams it has access to.
514
    In order to make this efficient and to avoid qudratic complexity, it caches the accessible streams by default.
515
    """
516

517
    def __call__(
1✔
518
        self, multi_stream: Optional[MultiStream] = None, **instance: Dict[str, Any]
519
    ) -> MultiStream:
520
        if instance:
×
521
            raise NotImplementedError("Instance mode is not supported")
×
522

523
        result = {}
×
524

525
        for stream_name, stream in multi_stream.items():
×
526
            stream = DynamicStream(
×
527
                self.generator,
528
                gen_kwargs={"stream": stream, "multi_stream": multi_stream},
529
            )
530
            result[stream_name] = stream
×
531

532
        return MultiStream(result)
×
533

534
    def generator(self, stream, multi_stream):
1✔
535
        yield from (
×
536
            self.process(self.verify_instance(instance), multi_stream)
537
            for instance in stream
538
        )
539

540
    @abstractmethod
1✔
541
    def process(self, instance: dict, multi_stream: MultiStream) -> dict:
1✔
542
        pass
543

544

545
class SequentialMixin(Artifact):
1✔
546
    max_steps: Optional[int] = None
1✔
547
    steps: List[StreamingOperator] = field(default_factory=list)
1✔
548

549
    def num_steps(self) -> int:
1✔
550
        return len(self.steps)
1✔
551

552
    def set_max_steps(self, max_steps):
1✔
553
        assert (
1✔
554
            max_steps <= self.num_steps()
555
        ), f"Max steps requested ({max_steps}) is larger than defined steps {self.num_steps()}"
556
        assert max_steps >= 1, f"Max steps requested ({max_steps}) is less than 1"
1✔
557
        self.max_steps = max_steps
1✔
558

559
    def get_last_step_description(self):
1✔
560
        last_step = (
×
561
            self.max_steps - 1 if self.max_steps is not None else len(self.steps) - 1
562
        )
563
        return self.steps[last_step].__description__
×
564

565
    def _get_max_steps(self):
1✔
566
        return self.max_steps if self.max_steps is not None else len(self.steps)
1✔
567

568

569
class SequentialOperator(MultiStreamOperator, SequentialMixin):
1✔
570
    """A class representing a sequential operator in the streaming system.
571

572
    A sequential operator is a type of `MultiStreamOperator` that applies a sequence of other operators to a
573
    `MultiStream`. It maintains a list of `StreamingOperator`s and applies them in order to the `MultiStream`.
574
    """
575

576
    def process(self, multi_stream: Optional[MultiStream] = None) -> MultiStream:
1✔
577
        for operator in self.steps[0 : self._get_max_steps()]:
1✔
578
            multi_stream = operator(multi_stream)
1✔
579
        return multi_stream
1✔
580

581

582
class SourceSequentialOperator(SourceOperator, SequentialMixin):
1✔
583
    """A class representing a source sequential operator in the streaming system.
584

585
    A source sequential operator is a type of `SequentialOperator` that starts with a source operator.
586
    The first operator in its list of steps is a `SourceOperator`, which generates the initial `MultiStream`
587
    that the other operators then process.
588
    """
589

590
    def process(self, multi_stream: Optional[MultiStream] = None) -> MultiStream:
1✔
591
        assert (
1✔
592
            self.num_steps() > 0
593
        ), "Calling process on a SourceSequentialOperator without any steps"
594
        multi_stream = self.steps[0]()
1✔
595
        for operator in self.steps[1 : self._get_max_steps()]:
1✔
596
            multi_stream = operator(multi_stream)
1✔
597
        return multi_stream
1✔
598

599

600
class SequentialOperatorInitializer(SequentialOperator):
1✔
601
    """A class representing a sequential operator initializer in the streaming system.
602

603
    A sequential operator initializer is a type of `SequntialOperator` that starts with a stream initializer operator. The first operator in its list of steps is a `StreamInitializerOperator`, which generates the initial `MultiStream` based on the provided arguments and keyword arguments.
604
    """
605

606
    def __call__(self, *args, **kwargs) -> MultiStream:
1✔
607
        return self.process(*args, **kwargs)
1✔
608

609
    def process(self, *args, **kwargs) -> MultiStream:
1✔
610
        assert (
1✔
611
            self.num_steps() > 0
612
        ), "Calling process on a SequentialOperatorInitializer without any steps"
613

614
        assert isinstance(
1✔
615
            self.steps[0], StreamInitializerOperator
616
        ), "The first step in a SequentialOperatorInitializer must be a StreamInitializerOperator"
617
        multi_stream = self.steps[0](*args, **kwargs)
1✔
618
        for operator in self.steps[1 : self._get_max_steps()]:
1✔
619
            multi_stream = operator(multi_stream)
1✔
620
        return multi_stream
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc