• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

KarlNaumann / MacroStat / 21338612700

25 Jan 2026 07:55PM UTC coverage: 95.866% (-0.7%) from 96.528%
21338612700

Pull #62

github

web-flow
Merge ee0789e1d into 924f2aeff
Pull Request #62: Difftool multiprocessing

343 of 350 branches covered (98.0%)

Branch coverage included in aggregate %.

160 of 187 new or added lines in 4 files covered. (85.56%)

3 existing lines in 1 file now uncovered.

2208 of 2311 relevant lines covered (95.54%)

0.96 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.67
/src/macrostat/util/batchprocessing.py
1
"""
2
Batch processing functionionality
3
"""
4

5
__author__ = ["Karl Naumann-Woleske"]
1✔
6
__credits__ = ["Karl Naumann-Woleske"]
1✔
7
__license__ = "MIT"
1✔
8
__version__ = "0.1.0"
1✔
9
__maintainer__ = ["Karl Naumann-Woleske"]
1✔
10

11
import logging
1✔
12
import sys
1✔
13
import traceback
1✔
14
from contextlib import contextmanager
1✔
15

16
import torch.multiprocessing as mp
1✔
17
from torch.multiprocessing import Pool
1✔
18
from tqdm import tqdm
1✔
19

20
logger = logging.getLogger(__name__)
1✔
21

22

23
@contextmanager
1✔
24
def pool_context(*args, **kwargs):
1✔
25
    """Context manager for process pool to ensure proper cleanup."""
26
    pool = Pool(*args, **kwargs)
1✔
27
    try:
1✔
28
        yield pool
1✔
29
    finally:
30
        logger.debug("Cleaning up process pool")
1✔
31
        pool.terminate()
1✔
32
        pool.join()
1✔
33
        logger.debug("Process pool cleanup completed")
1✔
34

35

36
def timeseries_worker(task: tuple):
1✔
37
    """Worker function for parallel_processor, which will execute a
38
    simulation with the given parameters and return the output.
39

40
    Parameters
41
    ----------
42
    task : tuple
43
        Tuple of (name, model, *args) where name is the name of the
44
        simulation, model is the model to be simulated and *args are
45
        the arguments to be passed to the model's simulate method.
46

47
    Returns
48
    -------
49
    tuple
50
        Tuple of (name, *args, output) where name is the name of the
51
        simulation, *args are the arguments passed to the model's
52
        simulate method and output is the output of the simulation.
53
    """
54
    try:
1✔
55
        model = task[1]
1✔
56
        _ = model.simulate(*task[2:])
1✔
57
        return (task[0], *task[2:], model.variables.to_pandas())
1✔
58
    except Exception as e:
×
59
        logger.error(f"Worker failed for task {task[0]}: {str(e)}")
×
60
        logger.error(traceback.format_exc())
×
61
        raise
×
62

63

64
def parallel_processor(
1✔
65
    tasks: list = [],
66
    worker: callable = timeseries_worker,
67
    cpu_count: int = 1,
68
    sharing_strategy: str | None = "file_system",
69
    progress_bar: bool = False,
70
):
71
    """Run all of the tasks in parallel using the ProcessPoolExecutor.
72

73
    Parameters
74
    ----------
75
    tasks : list
76
        List of tasks to process
77
    worker : callable
78
        Worker function to process each task
79
    cpu_count : int
80
        Number of CPU workers to use
81
    sharing_strategy : str | None, optional
82
        Sharing strategy for torch.multiprocessing. If None, no sharing
83
        strategy is set (ensures atomic tensors). Default is "file_system".
84
    progress_bar : bool, optional
85
        Whether to show progress bar (only if TTY available). Default is False.
86

87
    Returns
88
    -------
89
    list
90
        Results from worker function
91
    """
92
    # Set multiprocessing start method to spawn
93
    try:
1✔
94
        mp.set_start_method("spawn", force=True)
1✔
95
    except RuntimeError:
×
96
        pass
×
97

98
    # Set sharing strategy if provided
99
    if sharing_strategy is not None:
1✔
100
        mp.set_sharing_strategy(sharing_strategy)
1✔
101

102
    if len(tasks) == 0:
1✔
103
        raise ValueError("No tasks to process.")
1✔
104

105
    process_count = min(cpu_count, len(tasks))
1✔
106
    logger.debug(f"Creating process pool with {process_count} workers")
1✔
107

108
    try:
1✔
109
        # Handle progress bar if requested
110
        if progress_bar and sys.stdout.isatty():
1✔
NEW
111
            with pool_context(processes=process_count) as pool:
×
NEW
112
                results = list(
×
113
                            tqdm(
114
                                pool.imap(worker, tasks),
115
                                total=len(tasks),
116
                                desc="Processing tasks",
117
                            )
118
                        )
NEW
119
                logger.debug("Parallel processing completed")
×
NEW
120
                return results
×
121

122
        else:
123
            with pool_context(processes=process_count) as pool:
1✔
124
                logger.debug("Process pool created successfully")
1✔
125
                results = pool.map(worker, tasks)
1✔
126
                logger.debug("Parallel processing completed")
1✔
127
                return results
1✔
128
    except Exception as e:
×
129
        logger.error(f"Error in process pool: {str(e)}")
×
130
        logger.error(traceback.format_exc())
×
NEW
131
        raise e
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc