• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

speedyk-005 / chunklet-py / 20357264638

19 Dec 2025 01:56AM UTC coverage: 81.036% (-0.3%) from 81.333%
20357264638

push

github

speedyk-005
fix: Add python-multipart dependency for visualizer

1299 of 1603 relevant lines covered (81.04%)

4.05 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.95
/src/chunklet/common/batch_runner.py
1
from typing import Any, Literal, Callable
5✔
2
from collections.abc import Iterable, Generator
5✔
3

4
# mpire is lazy imported
5

6
from loguru import logger
5✔
7
from chunklet.common.validation import safely_count_iterable
5✔
8

9

10
def capture_result_and_exception(func):
5✔
11
    """Decorator to capture result and exception from a function call."""
12

13
    def wrapper(*args, **kwargs):
5✔
14
        try:
×
15
            res = func(*args, **kwargs)
×
16
            return res, None
×
17
        except Exception as e:
×
18
            return None, e
×
19

20
    return wrapper
5✔
21

22

23
def run_in_batch(
5✔
24
    func: Callable,
25
    iterable_of_args: Iterable,
26
    iterable_name: str,
27
    n_jobs: int | None = None,
28
    show_progress: bool = True,
29
    on_errors: Literal["raise", "skip", "break"] = "raise",
30
    separator: Any = None,
31
    verbose: bool = True,
32
) -> Generator[Any, None, None]:
33
    """
34
    Processes a batch of items in parallel using multiprocessing.
35
    Splits the iterable into chunks and executes the function on each.
36

37
    Args:
38
        func (Callable): The function to call for each argument.
39
        iterable_of_args (Iterable): An iterable of inputs to process.
40
        iterable_name: Name of the iterable. needed for logging and exception message.
41
        n_jobs (int | None): Number of parallel workers to use.
42
            If None, uses all available CPUs. Must be >= 1 if specified.
43
        show_progress (bool): Whether to display a progress bar.
44
        on_errors (Literal["raise", "skip", "break"]):
45
            How to handle errors during processing. Defaults to "raise".
46
        separator (Any): A value to be yielded after the chunks of each text are processed.
47
            Note: None cannot be used as a separator.
48
        verbose (bool): Whether to enable verbose logging.
49

50
    Yields:
51
        Any: A `Box` object containing the chunk content and metadata, or any separator object.
52
    """
53
    from mpire import WorkerPool
5✔
54

55
    total, iterable_of_args = safely_count_iterable(iterable_name, iterable_of_args)
5✔
56

57
    if verbose:
5✔
58
        logger.info("Starting batch chunking for {} items.", total)
×
59

60
    if total == 0:
5✔
61
        if verbose:
5✔
62
            logger.info("Input {} is empty. Returning empty iterator.", iterable_name)
×
63
        return iter([])
5✔
64

65
    failed_count = 0
5✔
66
    try:
5✔
67
        with WorkerPool(n_jobs=n_jobs) as pool:
5✔
68
            imap_func = pool.imap if separator is not None else pool.imap_unordered
5✔
69

70
            progress_bar_options = {
5✔
71
                "bar_format": "{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}, {rate_fmt}]",
72
                "desc": "Chunking ...",
73
            }
74

75
            task_iter = imap_func(
5✔
76
                capture_result_and_exception(func),
77
                iterable_of_args,
78
                iterable_len=total,
79
                progress_bar=show_progress,
80
                progress_bar_options=progress_bar_options,
81
            )
82

83
            for res, error in task_iter:
5✔
84
                if error:
5✔
85
                    failed_count += 1
5✔
86
                    if on_errors == "raise":
5✔
87
                        raise error
5✔
88
                    elif on_errors == "break":
5✔
89
                        logger.error(
5✔
90
                            "A task for {} failed. Returning partial results.\nReason: {}",
91
                            iterable_name,
92
                            error,
93
                        )
94
                        break
5✔
95

96
                    #  Else: skip
97
                    logger.warning("Skipping a failed task.\nReason: {}", error)
5✔
98
                    continue
5✔
99

100
                yield from res
5✔
101

102
                if separator is not None:
5✔
103
                    yield separator
5✔
104

105
    finally:
106
        if verbose:
5✔
107
            logger.info(
×
108
                "Batch processing completed. {}/{} items processed successfully.",
109
                total - failed_count,
110
                total,
111
            )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc