• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

KarlNaumann / MacroStat / 26623145945

29 May 2026 07:00AM UTC coverage: 90.685% (-0.2%) from 90.92%
26623145945

push

github

web-flow
ci: migrate from Cirrus to GitHub Actions (#76)

* chore(ci): register slow marker and add py311/py312 tox envs

Pre-flight hygiene for the CI migration off Cirrus. Two provider-independent
fixes that de-risk the upcoming matrix:

- Register the slow pytest marker in setup.cfg so CI logs no longer carry a
  PytestUnknownMarkWarning on every run. The slow tests already exist
  (KirmansAnts stationary KS, two common tests); only the registration was
  missing.
- Add explicit py311 and py312 tox envs mirroring the existing py313 env.
  Without these, tox -e py311 falls through to the base [testenv] and runs
  the full slow suite, which would surprise the matrix CI.

* ci: migrate from Cirrus to GitHub Actions

Cirrus-CI shuts down 2026-05-31. Replace it with a 2-OS x 3-Python matrix
on GitHub Actions, restoring the wheel-install pattern and rewiring the
README badges. No-op for local development.

Workflow shape:
- build job (ubuntu-latest, Python 3.13) runs 'tox -e clean,build' and
  uploads dist/* as an artifact, so every matrix leg installs the SAME
  wheel and catches packaging bugs that per-job rebuilds miss.
- test matrix (ubuntu-latest, windows-latest) x (3.11, 3.12, 3.13) with
  fail-fast: false to mirror Cirrus' independent-task semantics.
- actions/checkout uses fetch-depth: 0 because setuptools_scm is
  configured with version_scheme = 'no-guess-dev' and refuses to build
  on shallow clones without tag history.
- actions/setup-python@v5 caches pip keyed on pyproject.toml + uv.lock.
  The torch CPU wheel is ~200 MB; the cache is the single biggest perf
  win across the matrix.
- Windows enables long-path support via PowerShell before any install,
  porting the Cirrus registry tweak.
- Coverage uploads per-leg with coverallsapp/github-action@v2
  parallel: true; a final coverage-finish job calls parallel-finished
  so Coveralls merges the six legs into one report.
- schedule cron at 04:00 UTC nightly runs the FULL suite (slow + fast)
  on master... (continued)

495 of 538 branches covered (92.01%)

Branch coverage included in aggregate %.

1 of 1 new or added line in 1 file covered. (100.0%)

7 existing lines in 2 files now uncovered.

3331 of 3681 relevant lines covered (90.49%)

5.43 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.75
/src/macrostat/sample/sampler.py
1
"""
2
Class designed to facilitate the sampling of the model's
3
parameter space
4
"""
5

6
__author__ = ["Karl Naumann-Woleske"]
6✔
7
__credits__ = ["Karl Naumann-Woleske"]
6✔
8
__license__ = "MIT"
6✔
9
__maintainer__ = ["Karl Naumann-Woleske"]
6✔
10

11
# Default libraries
12
import copy
6✔
13
import gc
6✔
14
import logging
6✔
15
import multiprocessing as mp
6✔
16
import os
6✔
17
from datetime import datetime as dt
6✔
18
from pathlib import Path
6✔
19

20
# Third-party libraries
21
import pandas as pd
6✔
22

23
import macrostat.util.batchprocessing as msbatchprocessing
6✔
24
from macrostat.core import Model
6✔
25

26
logger = logging.getLogger(__name__)
6✔
27

28

29
class BaseSampler:
6✔
30
    def __init__(
6✔
31
        self,
32
        model: Model,
33
        bounds: dict | None = None,
34
        logspace: bool = False,
35
        worker_function: callable = msbatchprocessing.timeseries_worker,
36
        simulation_args: tuple = (),
37
        output_folder: str = "samples",
38
        cpu_count: int = 1,
39
        batchsize: int = None,
40
        save_to_disk: bool = True,
41
        output_filetype: str = "csv",
42
        output_compression: str | None = None,
43
    ):
44
        """Generalized class to facilitate the sampling of the model's
45
        parameterspace using python's multiprocessing library.
46

47
        Parameters
48
        ----------
49
        model: Model
50
            Model to be sampled
51
        worker_function: callable (default batchprocessing.timeseries_worker)
52
            Function to be used for the parallel processing
53
        output_folder: str (default "samples")
54
            Folder to save the output files
55
        cpu_count: int (default 1)
56
            Number of CPUs to use for the parallel processing
57
        batchsize: int (default None)
58
            Size of each batch to be processed in parallel
59
        save_to_disk: bool (default True)
60
            Save each of the batches to disk individually
61
        output_filetype: str (default "csv")
62
            Filetype to use for the output files. Options are
63
            "csv", "parquet"
64
        output_compression: str (default None)
65
            Compression method to use for the output files. Options are
66
            None (default), "gzip" or "zstd
67
        """
68
        # Model parameters
69
        self.model = model
6✔
70
        self.modelclass = type(model)
6✔
71
        self.base_parameters = copy.deepcopy(model.parameters)
6✔
72

73
        # Boundaries for the parameters
74
        self.logspace = logspace
6✔
75
        if bounds is not None:
6✔
76
            self.bounds = bounds
6✔
77
        else:
78
            all_bounds = self.model.parameters.get_bounds()
6✔
79
            free_names = set(self.model.parameters.get_free_param_names())
6✔
80
            self.bounds = {k: v for k, v in all_bounds.items() if k in free_names}
6✔
81
        self.verify_bounds(self.bounds)
6✔
82

83
        # Computation parameters
84
        self.worker_function = worker_function
6✔
85
        self.cpu_count = min([mp.cpu_count(), cpu_count])
6✔
86
        self.batchsize = batchsize
6✔
87
        self.simulation_args = simulation_args
6✔
88

89
        # Set up the output folder
90
        self.save_to_disk = save_to_disk
6✔
91
        self.output_folder = Path(output_folder)
6✔
92
        self.output_filetype = output_filetype
6✔
93
        self.output_compression = output_compression
6✔
94
        os.makedirs(output_folder, exist_ok=True)
6✔
95

96
    def generate_parameters(self):
6✔
97
        """Generate parameters for the parallel processor"""
98
        raise NotImplementedError("This method should be implemented in a subclass")
99

100
    def generate_tasks(self, points: pd.DataFrame):
6✔
101
        """Generate tasks for the parallel processor based on the parameters
102
        generated by the `generate_parameters` method.
103

104
        Parameters
105
        ----------
106
        points: pd.DataFrame
107
            DataFrame containing the points to be processed
108

109
        Returns
110
        -------
111
        list[tuple]
112
            List of tuples containing the model and the task to be processed
113
        """
114

115
        tasks = []
6✔
116
        for i in points.index:
6✔
117
            # Keep all the information, just change the values
118
            values = self.model.parameters.get_default_parameters()
6✔
119
            for k, v in points.loc[i].to_dict().items():
6✔
120
                values[k]["value"] = v
6✔
121
                values[k]["lower bound"] = self.bounds[k][0]
6✔
122
                values[k]["upper bound"] = self.bounds[k][1]
6✔
123
            newparams = self.model.parameters.__class__(
6✔
124
                parameters=values,
125
                hyperparameters=self.model.parameters.hyper,
126
            )
127

128
            # Create new model instance with new parameters
129
            newmodel = self.model.__class__(
6✔
130
                parameters=newparams,
131
                scenarios=self.model.scenarios,
132
                variables=self.model.variables,
133
                log_level=logging.CRITICAL,  # Suppress logging
134
            )
135

136
            # Generate the task to execute
137
            tasks.append((i, newmodel, *self.simulation_args))
6✔
138

139
        return tasks
6✔
140

141
    def sample(self, verbose: bool = False, points: pd.DataFrame = None):
6✔
142
        """Run in parallel the sampling of the model's parameterspace
143
        by generating a set of tasks and executing them in parallel
144

145
        Parameters
146
        ----------
147
        verbose: bool (default False)
148
            Whether to print progress information
149
        """
150

151
        try:
6✔
152
            if points is None:
6✔
153
                self.points = self.generate_parameters()
6✔
154
            else:
155
                self.points = points
×
156

157
            # Run the parallel processing in batches to conserve memory
158
            if self.batchsize is None:
6✔
159
                self.batchsize = self.points.shape[0]
6✔
160

161
            batchcount = int(self.points.shape[0] / self.batchsize) + (
6✔
162
                self.points.shape[0] % self.batchsize > 0
163
            )
164

165
            start_time = dt.now()
6✔
166
            logger.info(
6✔
167
                f"Processing {self.points.shape[0]} tasks starting at {start_time}"
168
            )
169
            logger.info(f"Expecting to use {batchcount} batches")
6✔
170

171
            if not self.save_to_disk:
6✔
172
                all_outputs = {}
×
173

174
            for batch in range(batchcount):
6✔
175
                try:
6✔
176
                    if verbose and batch != 0:
6✔
177
                        elapsed = dt.now() - start_time
×
178
                        logger.info(
×
179
                            f"Processing batch {batch+1:05d} of {batchcount:05d}. Elapsed {elapsed} ({elapsed/batch} per batch)"
180
                        )
181

182
                    end = min([(batch + 1) * self.batchsize, self.points.shape[0]])
6✔
183
                    batch_tasks = self.generate_tasks(
6✔
184
                        points=self.points.iloc[batch * self.batchsize : end]
185
                    )
186

187
                    parameters = {
6✔
188
                        v[0]: v[1].parameters.get_values() for v in batch_tasks
189
                    }
190
                    parameters = pd.DataFrame(parameters).T.to_csv(
6✔
191
                        self.output_folder / f"parameters_{batch}.csv", index_label="id"
192
                    )
193

194
                    # Execute those tasks
195
                    raw_outputs = msbatchprocessing.parallel_processor(
6✔
196
                        tasks=batch_tasks,
197
                        worker=self.worker_function,
198
                        cpu_count=self.cpu_count,
199
                    )
200

201
                    # Save the outputs to disk
202
                    pd_outputs = self.transform_outputs(raw_outputs, batch=batch)
6✔
203
                    if self.save_to_disk:
6✔
204
                        self.save_outputs(pd_outputs, batch=batch)
6✔
205
                    else:
206
                        all_outputs[batch] = pd_outputs
×
207

208
                    # Clean up batch resources
209
                    del raw_outputs
6✔
210
                    gc.collect()
6✔
211

212
                except Exception as e:
6✔
213
                    logger.error(f"Error processing batch {batch}: {str(e)}")
6✔
214
                    raise
6✔
215

216
        except Exception as e:
6✔
217
            logger.error(f"Error in sampling process: {str(e)}")
6✔
218
            raise
6✔
219
        finally:
220
            # Clean up any remaining resources
221
            logger.info("Performing final cleanup")
6✔
222
            if hasattr(self, "tasks"):
6✔
223
                del self.tasks
×
224
            gc.collect()
6✔
225

226
        if not self.save_to_disk:
6✔
227
            names = ["batch", *all_outputs[0].index.names]
×
228
            return pd.concat(all_outputs, axis=0, names=names)
×
229

230
    def transform_outputs(self, raw_outputs: list, batch: int):
6✔
231
        """Concatenate the raw outputs into a single pandas dataframe
232

233
        Parameters
234
        ----------
235
        raw_outputs: list
236
            List of outputs from the parallel processing. By default,
237
            batchprocessing.timeseries_worker returns a tuple of
238
            (*task_arguments, output)
239
        batch: int
240
            Batch number to save the outputs. Assumes that
241
            the batchsize is constant.
242

243
        Returns
244
        -------
245
        output: pd.DataFrame
246
        """
247
        index_names = list(raw_outputs[0][-1].index.names)
6✔
248
        if all(x is None for x in index_names):
6✔
249
            index_names = [f"index{i+1}" for i in range(len(index_names))]
×
250
        data = {v[0]: v[-1] for v in raw_outputs}
6✔
251
        data = pd.concat(
6✔
252
            data.values(), keys=data.keys(), names=["ID"] + index_names, axis=0
253
        )
254
        return data
6✔
255

256
    def save_outputs(self, data: pd.DataFrame, batch: int):
6✔
257
        """Save the raw outputs to disk.
258

259
        The model's outputs are in the form of a pandas DataFrame.
260
        This method should save the outputs to disk in a format that
261
        can be easily read back in later. Generically, it writes a
262
        CSV file with the outputs in a MultiIndex format. However,
263
        this can be overwritten to save in a different format.
264

265
        Parameters
266
        ----------
267
        data: pd.DataFrame
268
            The samples run in this dataset
269
        batch: int
270
            Batch number to save the outputs. Assumes that
271
            the batchsize is constant.
272
        """
273
        # Concatenate the outputs
274
        if self.output_filetype == "csv":
6✔
275
            data.to_csv(
6✔
276
                self.output_folder / f"outputs_{batch}.csv",
277
                compression=self.output_compression,
278
            )
279
        elif self.output_filetype == "parquet":
6✔
UNCOV
280
            data.to_parquet(
×
281
                self.output_folder / f"outputs_{batch}.parquet",
282
                compression=self.output_compression,
283
            )
284
        else:
285
            raise ValueError(f"Invalid output filetype: {self.output_filetype}")
6✔
286

287
    def verify_bounds(self, bounds: dict) -> None:
6✔
288
        """Verify that the bounds are correctly set, in particular
289
        0. Check that the parameters are in the model
290
        1. That there is a lower and upper bound for each parameter
291
        2. That the lower bound is smaller than the upper bound
292
        3. That the bounds are in the correct order
293
        4. If the bounds are in logspace, that the bounds are either
294
        both positive or both negative
295
        5. If the bounds are in logspace, that either bound is not zero
296

297
        Parameters
298
        ----------
299
        bounds: dict[str, tuple]
300
            Dictionary containing the bounds for each parameter to be sampled
301
        logspace: bool
302
            Whether to sample the parameters in logspace
303

304
        Returns
305
        -------
306
        None
307

308
        Raises
309
        ------
310
        ValueError
311
            If the bounds are not correctly set
312
        """
313
        # Check that the bounds are correctly set
314
        for param, bound in bounds.items():
6✔
315
            if param not in self.model.parameters:
6✔
316
                raise ValueError(f"Parameter {param} not in the model's parameters")
6✔
317
            if len(bound) != 2:
6✔
318
                raise ValueError(
6✔
319
                    f"Bounds should be a list-like of length 2. {param}: {bound}"
320
                )
321
            if self.logspace and (bound[0] < 0) != (bound[1] < 0):
6✔
322
                msg = "Bounds should be either both positive or both negative"
6✔
323
                raise ValueError(f"{msg}. {param}: {bound}")
6✔
324
            if self.logspace and (bound[0] == 0 or bound[1] == 0):
6✔
325
                raise ValueError(
6✔
326
                    f"Bounds cannot be zero when using logspace. {param}: {bound}"
327
                )
328
            if bound[0] >= bound[1]:
6✔
329
                msg = "Lower bound should be smaller than the upper bound"
6✔
330
                raise ValueError(f"{msg}. {param}: {bound}")
6✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc