• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

speedyk-005 / chunklet-py / 24848119348

23 Apr 2026 05:02PM UTC coverage: 90.715% (-0.04%) from 90.752%
24848119348

push

github

speedyk-005
chore: drop Python 3.10 support due to recurring CI hangs

Minimum Python version is now 3.11. Python 3.10 is approaching EOL
and had recurring CI hangs due to multiprocessing issues with mpire.

Closes #27

1358 of 1497 relevant lines covered (90.71%)

0.91 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.5
/src/chunklet/common/batch_runner.py
1
from collections.abc import Generator, Iterable
1✔
2
from typing import Any, Callable, Literal
1✔
3

4
# mpire is lazy imported
5
from loguru import logger
1✔
6

7
from chunklet.common.logging_utils import log_info
1✔
8
from chunklet.common.validation import safely_count_iterable
1✔
9

10

11
def capture_result_and_exception(func):
1✔
12
    """Decorator to capture result and exception from a function call."""
13

14
    def wrapper(*args, **kwargs):
1✔
15
        try:
×
16
            res = func(*args, **kwargs)
×
17
            return res, None
×
18
        except Exception as e:
×
19
            return None, e
×
20

21
    return wrapper
1✔
22

23

24
def run_in_batch(
1✔
25
    func: Callable,
26
    iterable_of_args: Iterable,
27
    iterable_name: str,
28
    n_jobs: int | None = None,
29
    show_progress: bool = True,
30
    on_errors: Literal["raise", "skip", "break"] = "raise",
31
    separator: Any = None,
32
    verbose: bool = True,
33
) -> Generator[Any, None, None]:
34
    """
35
    Processes a batch of items in parallel using multiprocessing.
36
    Splits the iterable into chunks and executes the function on each.
37

38
    Args:
39
        func: The function to call for each argument.
40
        iterable_of_args: An iterable of inputs to process.
41
        iterable_name: Name of the iterable. needed for logging and exception message.
42
        n_jobs: Number of parallel workers to use.
43
            If None, uses all available CPUs. Must be >= 1 if specified.
44
        show_progress: Whether to display a progress bar.
45
        on_errors:
46
            How to handle errors during processing. Defaults to "raise".
47
        separator: A value to be yielded after the chunks of each text are processed.
48
            Note: None cannot be used as a separator.
49
        verbose: Whether to enable verbose logging.
50

51
    Yields:
52
        A `DotDict` object containing the chunk content and metadata, or any separator object.
53
    """
54
    from mpire import WorkerPool
1✔
55

56
    total, iterable_of_args = safely_count_iterable(iterable_name, iterable_of_args)
1✔
57

58
    log_info(verbose, "Starting batch chunking for {} items.", total)
1✔
59

60
    if total == 0:
1✔
61
        log_info(verbose, "Input {} is empty. Returning empty iterator.", iterable_name)
1✔
62
        return iter([])
1✔
63

64
    failed_count = 0
1✔
65
    try:
1✔
66
        with WorkerPool(n_jobs=n_jobs) as pool:
1✔
67
            imap_func = pool.imap if separator is not None else pool.imap_unordered
1✔
68

69
            progress_bar_options = {
1✔
70
                "bar_format": "{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}, {rate_fmt}]",
71
                "desc": "Chunking ...",
72
            }
73

74
            task_iter = imap_func(
1✔
75
                capture_result_and_exception(func),
76
                iterable_of_args,
77
                iterable_len=total,
78
                progress_bar=show_progress,
79
                progress_bar_options=progress_bar_options,
80
            )
81

82
            for res, error in task_iter:
1✔
83
                if error:
1✔
84
                    failed_count += 1
1✔
85
                    if on_errors == "raise":
1✔
86
                        raise error
1✔
87
                    elif on_errors == "break":
1✔
88
                        logger.error(
1✔
89
                            "A task for {} failed. Returning partial results.\nReason: {}",
90
                            iterable_name,
91
                            error,
92
                        )
93
                        break
1✔
94

95
                    #  Else: skip
96
                    logger.warning("Skipping a failed task.\nReason: {}", error)
1✔
97
                    continue
1✔
98

99
                yield from res
1✔
100

101
                if separator is not None:
1✔
102
                    yield separator
1✔
103

104
    finally:
105
        log_info(
1✔
106
            verbose,
107
            "Batch processing completed. {}/{} items processed successfully.",
108
            total - failed_count,
109
            total,
110
        )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc