• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

KarlNaumann / MacroStat / 17696704914

13 Sep 2025 12:44PM UTC coverage: 95.711% (-1.0%) from 96.722%
17696704914

push

github

web-flow
Testing adjustments (#50)

* fix: restrict parameters passed to core to those included in the defaults

* fix: adjust for potentially mismatched dimensions in the course of the timeseries

* fix: tests for batchprocessing

* fix: adjust sampler tests for split in transform and save

* fix: correction for step range to avoid skipping one step timestep

* fix: drop from_json in core.Model as the parameters are now model-dependent

* fix: adapt tests for the model core to the parameter and variable adaptations (gather_timeseries and that parameters rejects parameters not in the default_parameters)

* fix: NK3E adjusted for revised variable treatment

247 of 249 branches covered (99.2%)

Branch coverage included in aggregate %.

14 of 14 new or added lines in 3 files covered. (100.0%)

23 existing lines in 4 files now uncovered.

1650 of 1733 relevant lines covered (95.21%)

0.95 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.23
/src/macrostat/sample/sampler.py
1
"""
2
Class designed to facilitate the sampling of the model's
3
parameter space
4
"""
5

6
__author__ = ["Karl Naumann-Woleske"]
1✔
7
__credits__ = ["Karl Naumann-Woleske"]
1✔
8
__license__ = "MIT"
1✔
9
__maintainer__ = ["Karl Naumann-Woleske"]
1✔
10

11
# Default libraries
12
import copy
1✔
13
import gc
1✔
14
import logging
1✔
15
import multiprocessing as mp
1✔
16
import os
1✔
17
from datetime import datetime as dt
1✔
18
from pathlib import Path
1✔
19

20
# Third-party libraries
21
import pandas as pd
1✔
22

23
import macrostat.util.batchprocessing as msbatchprocessing
1✔
24
from macrostat.core import Model
1✔
25

26
logger = logging.getLogger(__name__)
1✔
27

28

29
class BaseSampler:
1✔
30
    def __init__(
1✔
31
        self,
32
        model: Model,
33
        bounds: dict | None = None,
34
        logspace: bool = False,
35
        worker_function: callable = msbatchprocessing.timeseries_worker,
36
        simulation_args: tuple = (),
37
        output_folder: str = "samples",
38
        cpu_count: int = 1,
39
        batchsize: int = None,
40
        save_to_disk: bool = True,
41
        output_filetype: str = "csv",
42
        output_compression: str | None = None,
43
    ):
44
        """Generalized class to facilitate the sampling of the model's
45
        parameterspace using python's multiprocessing library.
46

47
        Parameters
48
        ----------
49
        model: Model
50
            Model to be sampled
51
        worker_function: callable (default batchprocessing.timeseries_worker)
52
            Function to be used for the parallel processing
53
        output_folder: str (default "samples")
54
            Folder to save the output files
55
        cpu_count: int (default 1)
56
            Number of CPUs to use for the parallel processing
57
        batchsize: int (default None)
58
            Size of each batch to be processed in parallel
59
        save_to_disk: bool (default True)
60
            Save each of the batches to disk individually
61
        output_filetype: str (default "csv")
62
            Filetype to use for the output files. Options are
63
            "csv", "parquet"
64
        output_compression: str (default None)
65
            Compression method to use for the output files. Options are
66
            None (default), "gzip" or "zstd
67
        """
68
        # Model parameters
69
        self.model = model
1✔
70
        self.modelclass = type(model)
1✔
71
        self.base_parameters = copy.deepcopy(model.parameters)
1✔
72

73
        # Boundaries for the parameters
74
        self.logspace = logspace
1✔
75
        self.bounds = (
1✔
76
            bounds if bounds is not None else self.model.parameters.get_bounds()
77
        )
78
        self.verify_bounds(self.bounds)
1✔
79

80
        # Computation parameters
81
        self.worker_function = worker_function
1✔
82
        self.cpu_count = min([mp.cpu_count(), cpu_count])
1✔
83
        self.batchsize = batchsize
1✔
84
        self.simulation_args = simulation_args
1✔
85

86
        # Set up the output folder
87
        self.save_to_disk = save_to_disk
1✔
88
        self.output_folder = Path(output_folder)
1✔
89
        self.output_filetype = output_filetype
1✔
90
        self.output_compression = output_compression
1✔
91
        os.makedirs(output_folder, exist_ok=True)
1✔
92

93
    def generate_parameters(self):
1✔
94
        """Generate parameters for the parallel processor"""
95
        raise NotImplementedError("This method should be implemented in a subclass")
96

97
    def generate_tasks(self, points: pd.DataFrame):
1✔
98
        """Generate tasks for the parallel processor based on the parameters
99
        generated by the `generate_parameters` method.
100

101
        Parameters
102
        ----------
103
        points: pd.DataFrame
104
            DataFrame containing the points to be processed
105

106
        Returns
107
        -------
108
        list[tuple]
109
            List of tuples containing the model and the task to be processed
110
        """
111

112
        tasks = []
1✔
113
        for i in points.index:
1✔
114
            # Keep all the information, just change the values
115
            values = self.model.parameters.get_default_parameters()
1✔
116
            for k, v in points.loc[i].to_dict().items():
1✔
117
                values[k]["value"] = v
1✔
118
                values[k]["lower bound"] = self.bounds[k][0]
1✔
119
                values[k]["upper bound"] = self.bounds[k][1]
1✔
120
            newparams = self.model.parameters.__class__(
1✔
121
                parameters=values,
122
                hyperparameters=self.model.parameters.hyper,
123
            )
124

125
            # Create new model instance with new parameters
126
            newmodel = self.model.__class__(
1✔
127
                parameters=newparams,
128
                scenarios=self.model.scenarios,
129
                variables=self.model.variables,
130
                log_level=logging.CRITICAL,  # Suppress logging
131
            )
132

133
            # Generate the task to execute
134
            tasks.append((i, newmodel, *self.simulation_args))
1✔
135

136
        return tasks
1✔
137

138
    def sample(self, verbose: bool = False, points: pd.DataFrame = None):
1✔
139
        """Run in parallel the sampling of the model's parameterspace
140
        by generating a set of tasks and executing them in parallel
141

142
        Parameters
143
        ----------
144
        verbose: bool (default False)
145
            Whether to print progress information
146
        """
147

148
        try:
1✔
149
            if points is None:
1✔
150
                self.points = self.generate_parameters()
1✔
151
            else:
UNCOV
152
                self.points = points
×
153

154
            # Run the parallel processing in batches to conserve memory
155
            if self.batchsize is None:
1✔
156
                self.batchsize = self.points.shape[0]
1✔
157

158
            batchcount = int(self.points.shape[0] / self.batchsize) + (
1✔
159
                self.points.shape[0] % self.batchsize > 0
160
            )
161

162
            start_time = dt.now()
1✔
163
            logger.info(
1✔
164
                f"Processing {self.points.shape[0]} tasks starting at {start_time}"
165
            )
166
            logger.info(f"Expecting to use {batchcount} batches")
1✔
167

168
            if not self.save_to_disk:
1✔
UNCOV
169
                all_outputs = {}
×
170

171
            for batch in range(batchcount):
1✔
172
                try:
1✔
173
                    if verbose and batch != 0:
1✔
UNCOV
174
                        elapsed = dt.now() - start_time
×
UNCOV
175
                        logger.info(
×
176
                            f"Processing batch {batch+1:05d} of {batchcount:05d}. Elapsed {elapsed} ({elapsed/batch} per batch)"
177
                        )
178

179
                    end = min([(batch + 1) * self.batchsize, self.points.shape[0]])
1✔
180
                    batch_tasks = self.generate_tasks(
1✔
181
                        points=self.points.iloc[batch * self.batchsize : end]
182
                    )
183

184
                    parameters = {
1✔
185
                        v[0]: v[1].parameters.get_values() for v in batch_tasks
186
                    }
187
                    parameters = pd.DataFrame(parameters).T.to_csv(
1✔
188
                        self.output_folder / f"parameters_{batch}.csv", index_label="id"
189
                    )
190

191
                    # Execute those tasks
192
                    raw_outputs = msbatchprocessing.parallel_processor(
1✔
193
                        tasks=batch_tasks,
194
                        worker=self.worker_function,
195
                        cpu_count=self.cpu_count,
196
                    )
197

198
                    # Save the outputs to disk
199
                    pd_outputs = self.transform_outputs(raw_outputs, batch=batch)
1✔
200
                    if self.save_to_disk:
1✔
201
                        self.save_outputs(pd_outputs, batch=batch)
1✔
202
                    else:
UNCOV
203
                        all_outputs[batch] = pd_outputs
×
204

205
                    # Clean up batch resources
206
                    del raw_outputs
1✔
207
                    gc.collect()
1✔
208

209
                except Exception as e:
1✔
210
                    logger.error(f"Error processing batch {batch}: {str(e)}")
1✔
211
                    raise
1✔
212

213
        except Exception as e:
1✔
214
            logger.error(f"Error in sampling process: {str(e)}")
1✔
215
            raise
1✔
216
        finally:
217
            # Clean up any remaining resources
218
            logger.info("Performing final cleanup")
1✔
219
            if hasattr(self, "tasks"):
1✔
UNCOV
220
                del self.tasks
×
221
            gc.collect()
1✔
222

223
        if not self.save_to_disk:
1✔
UNCOV
224
            names = ["batch", *all_outputs[0].index.names]
×
UNCOV
225
            return pd.concat(all_outputs, axis=0, names=names)
×
226

227
    def transform_outputs(self, raw_outputs: list, batch: int):
1✔
228
        """Concatenate the raw outputs into a single pandas dataframe
229

230
        Parameters
231
        ----------
232
        raw_outputs: list
233
            List of outputs from the parallel processing. By default,
234
            batchprocessing.timeseries_worker returns a tuple of
235
            (*task_arguments, output)
236
        batch: int
237
            Batch number to save the outputs. Assumes that
238
            the batchsize is constant.
239

240
        Returns
241
        -------
242
        output: pd.DataFrame
243
        """
244
        index_names = list(raw_outputs[0][-1].index.names)
1✔
245
        if all(x is None for x in index_names):
1✔
UNCOV
246
            index_names = [f"index{i+1}" for i in range(len(index_names))]
×
247
        data = {v[0]: v[-1] for v in raw_outputs}
1✔
248
        data = pd.concat(
1✔
249
            data.values(), keys=data.keys(), names=["ID"] + index_names, axis=0
250
        )
251
        return data
1✔
252

253
    def save_outputs(self, data: pd.DataFrame, batch: int):
1✔
254
        """Save the raw outputs to disk.
255

256
        The model's outputs are in the form of a pandas DataFrame.
257
        This method should save the outputs to disk in a format that
258
        can be easily read back in later. Generically, it writes a
259
        CSV file with the outputs in a MultiIndex format. However,
260
        this can be overwritten to save in a different format.
261

262
        Parameters
263
        ----------
264
        data: pd.DataFrame
265
            The samples run in this dataset
266
        batch: int
267
            Batch number to save the outputs. Assumes that
268
            the batchsize is constant.
269
        """
270
        # Concatenate the outputs
271
        if self.output_filetype == "csv":
1✔
272
            data.to_csv(
1✔
273
                self.output_folder / f"outputs_{batch}.csv",
274
                compression=self.output_compression,
275
            )
276
        elif self.output_filetype == "parquet":
1✔
277
            data.to_parquet(
1✔
278
                self.output_folder / f"outputs_{batch}.parquet",
279
                compression=self.output_compression,
280
            )
281
        else:
282
            raise ValueError(f"Invalid output filetype: {self.output_filetype}")
1✔
283

284
    def verify_bounds(self, bounds: dict) -> None:
1✔
285
        """Verify that the bounds are correctly set, in particular
286
        0. Check that the parameters are in the model
287
        1. That there is a lower and upper bound for each parameter
288
        2. That the lower bound is smaller than the upper bound
289
        3. That the bounds are in the correct order
290
        4. If the bounds are in logspace, that the bounds are either
291
        both positive or both negative
292
        5. If the bounds are in logspace, that either bound is not zero
293

294
        Parameters
295
        ----------
296
        bounds: dict[str, tuple]
297
            Dictionary containing the bounds for each parameter to be sampled
298
        logspace: bool
299
            Whether to sample the parameters in logspace
300

301
        Returns
302
        -------
303
        None
304

305
        Raises
306
        ------
307
        ValueError
308
            If the bounds are not correctly set
309
        """
310
        # Check that the bounds are correctly set
311
        for param, bound in bounds.items():
1✔
312
            if param not in self.model.parameters:
1✔
313
                raise ValueError(f"Parameter {param} not in the model's parameters")
1✔
314
            if len(bound) != 2:
1✔
315
                raise ValueError(
1✔
316
                    f"Bounds should be a list-like of length 2. {param}: {bound}"
317
                )
318
            if self.logspace and (bound[0] < 0) != (bound[1] < 0):
1✔
319
                msg = "Bounds should be either both positive or both negative"
1✔
320
                raise ValueError(f"{msg}. {param}: {bound}")
1✔
321
            if self.logspace and (bound[0] == 0 or bound[1] == 0):
1✔
322
                raise ValueError(
1✔
323
                    f"Bounds cannot be zero when using logspace. {param}: {bound}"
324
                )
325
            if bound[0] >= bound[1]:
1✔
326
                msg = "Lower bound should be smaller than the upper bound"
1✔
327
                raise ValueError(f"{msg}. {param}: {bound}")
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc