• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

KarlNaumann / MacroStat / 21338829278

25 Jan 2026 08:12PM UTC coverage: 95.866% (-0.7%) from 96.528%
21338829278

Pull #62

github

web-flow
Merge 2ba49e0f9 into 924f2aeff
Pull Request #62: Difftool multiprocessing

343 of 350 branches covered (98.0%)

Branch coverage included in aggregate %.

159 of 186 new or added lines in 4 files covered. (85.48%)

3 existing lines in 1 file now uncovered.

2208 of 2311 relevant lines covered (95.54%)

0.96 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.19
/src/macrostat/diff/jacobian_numerical.py
1
"""
2
Numerical Jacobian computation using finite differences.
3
"""
4

5
import copy
1✔
6
import logging
1✔
7
import warnings
1✔
8
from typing import Callable, Dict, Literal
1✔
9

10
import torch
1✔
11
import torch.multiprocessing as mp
1✔
12

13
from macrostat.diff.jacobian_base import JacobianBase
1✔
14
from macrostat.util.batchprocessing import parallel_processor
1✔
15

16
logger = logging.getLogger(__name__)
1✔
17

18
LossFn = Callable[[Dict[str, torch.Tensor]], torch.Tensor]
1✔
19

20

21
def jacobian_worker(task):
1✔
22
    """Worker function for parallel Jacobian computation.
23

24
    Parameters
25
    ----------
26
    task : tuple
27
        Task tuple: (model, loss_fn, task_id, scenario)
28
        - model: MacroStat Model instance with perturbed parameters
29
        - loss_fn: Loss function
30
        - task_id: Tuple (param_name, direction)
31
        - scenario: Scenario index or name to use
32

33
    Returns
34
    -------
35
    tuple
36
        (*task_id, loss) where loss is the output from loss_fn
37
    """
NEW
38
    model, loss_fn, task_id, scenario = task
×
NEW
39
    try:
×
NEW
40
        output = model.simulate(scenario=scenario)
×
NEW
41
        loss = loss_fn(output)
×
NEW
42
        if isinstance(loss, torch.Tensor):
×
NEW
43
            loss = loss.detach()
×
44
    
NEW
45
        return (*task_id, loss)
×
NEW
46
    except Exception as e:
×
NEW
47
        logger.error(f"Worker failed for task {task_id}: {str(e)}")
×
NEW
48
        raise
×
49

50

51
class JacobianNumerical(JacobianBase):
1✔
52
    """
53
    Compute Jacobians using numerical finite differences.
54

55
    This class supports:
56
    - Direct and log-space parameter perturbations
57
    - Multiprocessing for large-scale models
58
    - Non-scalar loss functions (e.g., time series outputs)
59
    """
60

61
    def __init__(
1✔
62
        self,
63
        model,
64
        scenario: int | str = 0,
65
        epsilon: float = 1e-5,
66
        parameter_space: Literal["direct", "log"] = "direct",
67
    ):
68
        """
69
        Initialize numerical Jacobian computation.
70

71
        Parameters
72
        ----------
73
        model : Model
74
            MacroStat model instance
75
        scenario : int | str, optional
76
            Scenario to use for computation, by default 0
77
        epsilon : float, optional
78
            Perturbation size for finite differences, by default 1e-5
79
        parameter_space : {"direct", "log"}, optional
80
            Space in which to apply perturbations, by default "direct"
81
        """
82
        super().__init__(model, scenario)
1✔
83
        self.epsilon = epsilon
1✔
84
        self.parameter_space = parameter_space
1✔
85

86
    def _validate_log_space_params(self, param_names: list[str]):
1✔
87
        """Validate parameters for log-space computation.
88

89
        Parameters
90
        ----------
91
        param_names : list[str]
92
            List of parameter names to validate.
93

94
        Raises
95
        ------
96
        ValueError
97
            If any zero parameters found.
98
        
99
        Warns
100
        -----
101
        UserWarning
102
            If any negative parameters found.
103
        """
104
        zero_params = []
1✔
105
        negative_params = []
1✔
106

107
        for name in param_names:
1✔
108
            value = self.model.parameters[name]
1✔
109
            
110
            if value == 0:
1✔
111
                zero_params.append(name)
1✔
112
            if value < 0:
1✔
113
                negative_params.append(name)
1✔
114

115
        if zero_params:
1✔
116
            raise ValueError(
1✔
117
                f"Cannot use log-space with zero parameters: {zero_params}"
118
            )
119

120
        if negative_params:
1✔
121
            warnings.warn(
1✔
122
                f"Found negative parameters in log-space mode: {negative_params}. "
123
                "Using sign-preserving transformation: sign(p) * exp(log(abs(p)) + epsilon)"
124
            )
125

126
    def _apply_perturbation(
1✔
127
        self,
128
        param_value: float,
129
        direction: Literal["pos", "neg"],
130
    ) -> float:
131
        """Apply perturbation to a scalar parameter value.
132

133
        Parameters
134
        ----------
135
        param_value : float
136
            Original parameter value (scalar)
137
        direction : {"pos", "neg"}
138
            Direction of perturbation
139

140
        Returns
141
        -------
142
        float
143
            Perturbed parameter value
144
        """
145
        # Convert to tensor for computation
146
        value_tensor = torch.tensor(param_value)
1✔
147

148
        # Apply perturbation
149
        if self.parameter_space == "log":
1✔
150
            # Log-space perturbation
151
            sign = torch.sign(value_tensor)
1✔
152
            abs_value = torch.abs(value_tensor)
1✔
153
            log_value = torch.log(abs_value)
1✔
154

155
            if direction == "pos":
1✔
156
                new_value = sign * torch.exp(log_value + self.epsilon)
1✔
157
            else:  # neg
158
                new_value = sign * torch.exp(log_value - self.epsilon)
1✔
159
        else:
160
            # Direct-space perturbation
161
            if direction == "pos":
1✔
162
                new_value = value_tensor + self.epsilon
1✔
163
            else:  # neg
164
                new_value = value_tensor - self.epsilon
1✔
165

166
        return new_value.item()
1✔
167

168

169
    def _generate_tasks(
1✔
170
        self,
171
        param_names: list[str],
172
        loss_fn: LossFn,
173
        mode: Literal["central", "forward", "backward"],
174
    ) -> list:
175
        """Generate tasks for parallel processing.
176

177
        Parameters
178
        ----------
179
        param_names : list[str]
180
            List of parameter names to differentiate
181
        loss_fn : callable
182
            Loss function
183
        mode : {"central", "forward", "backward"}
184
            Finite difference mode
185

186
        Returns
187
        -------
188
        list
189
            List of task tuples: (model, loss_fn, task_id, scenario)
190
        """
191
        tasks = []
1✔
192

193
        if self.parameter_space == "log":
1✔
194
            self._validate_log_space_params(param_names)
1✔
195

196
        for param_name in param_names:
1✔
197
            base_value = self.model.parameters[param_name]
1✔
198

199
            if mode in {"central", "forward"}:
1✔
200
                # Positive perturbation
201
                par_pos = copy.deepcopy(self.model.parameters)
1✔
202
                perturbed_value = self._apply_perturbation(base_value, "pos")
1✔
203
                par_pos[param_name] = perturbed_value
1✔
204
                # Create new Scenarios with modified parameters and copy custom scenarios
205
                sce_pos = self.model.scenarios.__class__(parameters=par_pos)
1✔
206
                # Copy all custom scenarios (non-default) from original model
207
                for sc_id, sc_name in self.model.scenarios.info.items():
1✔
208
                    if sc_id != 0:  # Skip default scenario
1✔
NEW
209
                        ts_copy = copy.deepcopy(self.model.scenarios.timeseries[sc_id])
×
NEW
210
                        sce_pos.add_scenario(timeseries=ts_copy, name=sc_name["Name"])
×
211
                model_pos = self.model.__class__(parameters=par_pos, scenarios=sce_pos)
1✔
212
                task_id_pos = (param_name, "pos")
1✔
213
                tasks.append((model_pos, loss_fn, task_id_pos, self.scenario))
1✔
214

215
            if mode in {"central", "backward"}:
1✔
216
                # Negative perturbation
217
                par_neg = copy.deepcopy(self.model.parameters)
1✔
218
                perturbed_value = self._apply_perturbation(base_value, "neg")
1✔
219
                par_neg[param_name] = perturbed_value
1✔
220
                # Create new Scenarios with modified parameters and copy custom scenarios
221
                sce_neg = self.model.scenarios.__class__(parameters=par_neg)
1✔
222
                # Copy all custom scenarios (non-default) from original model
223
                for sc_id, sc_name in self.model.scenarios.info.items():
1✔
224
                    if sc_id != 0:  # Skip default scenario
1✔
NEW
225
                        ts_copy = copy.deepcopy(self.model.scenarios.timeseries[sc_id])
×
NEW
226
                        sce_neg.add_scenario(timeseries=ts_copy, name=sc_name["Name"])
×
227
                model_neg = self.model.__class__(parameters=par_neg, scenarios=sce_neg)
1✔
228
                task_id_neg = (param_name, "neg")
1✔
229
                tasks.append((model_neg, loss_fn, task_id_neg, self.scenario))
1✔
230

231
        return tasks
1✔
232

233
    def compute(
1✔
234
        self,
235
        loss_fn: LossFn,
236
        mode: Literal["central", "forward", "backward"] = "central",
237
        num_workers: int | None = None,
238
        progress_bar: bool = False,
239
        param_names: list[str] | None = None,
240
    ) -> Dict[str, torch.Tensor]:
241
        """
242
        Compute the numerical Jacobian of the loss w.r.t. model parameters.
243

244
        Parameters
245
        ----------
246
        loss_fn : callable
247
            Loss function that takes model output dict and returns a tensor
248
        mode : {"central", "forward", "backward"}, optional
249
            Finite difference mode, by default "central"
250
        num_workers : int, optional
251
            Number of parallel workers, by default uses all CPUs
252
        progress_bar : bool, optional
253
            Whether to show progress bar, by default False
254
        param_names : list[str], optional
255
            List of parameter names to differentiate. If None, differentiates
256
            all parameters in model.parameters.values, by default None
257

258
        Returns
259
        -------
260
        dict[str, torch.Tensor]
261
            Dictionary mapping parameter names to their Jacobian tensors.
262
            Each tensor has shape (*loss_shape, *param_shape).
263
        """
264
        if mode not in {"central", "forward", "backward"}:
1✔
UNCOV
265
            raise ValueError(
×
266
                f"Unsupported mode '{mode}'. Use 'central', 'forward', or 'backward'."
267
            )
268

269
        if param_names is None:
1✔
270
            param_names = list(self.model.parameters.values.keys())
1✔
271

272
        output_base = self.model.simulate(scenario=self.scenario)
1✔
273
        loss_base = loss_fn(output_base)
1✔
274

275
        tasks = self._generate_tasks(param_names, loss_fn, mode)
1✔
276

277
        if not tasks:
1✔
NEW
278
            jacobian = {}
×
NEW
UNCOV
279
            return jacobian
×
280

281
        if num_workers is None:
1✔
282
            num_workers = mp.cpu_count()
1✔
283

284
        results = parallel_processor(
1✔
285
            tasks=tasks,
286
            worker=jacobian_worker,
287
            cpu_count=num_workers,
288
            sharing_strategy=None,  # No sharing - atomic tensors
289
            progress_bar=progress_bar,
290
        )
291

292
        results_by_param: Dict[str, Dict[str, torch.Tensor]] = {}
1✔
293
        for result in results:
1✔
294
            param_name, direction, loss_value = result
1✔
295
            if param_name not in results_by_param:
1✔
296
                results_by_param[param_name] = {}
1✔
297
            if not isinstance(loss_value, torch.Tensor):
1✔
NEW
298
                loss_value = torch.tensor(loss_value)
×
299
            results_by_param[param_name][direction] = loss_value
1✔
300

301
        jacobian: Dict[str, torch.Tensor] = {}
1✔
302
        for param_name in param_names:
1✔
303
            losses = results_by_param.get(param_name, {})
1✔
304

305
            if all([mode == "central", "pos" in losses, "neg" in losses]):
1✔
306
                grad = (losses["pos"] - losses["neg"]) / (2.0 * self.epsilon)
1✔
NEW
307
            elif mode == "forward" and "pos" in losses:
×
NEW
308
                grad = (losses["pos"] - loss_base) / self.epsilon
×
NEW
309
            elif mode == "backward" and "neg" in losses:
×
NEW
310
                grad = (loss_base - losses["neg"]) / self.epsilon
×
311
            else:
NEW
UNCOV
312
                grad = torch.zeros_like(loss_base)
×
313

314
            jacobian[param_name] = grad
1✔
315

316
        self.jacobian = jacobian
1✔
317
        return jacobian
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc