• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

quaquel / EMAworkbench / 18214982978

03 Oct 2025 06:39AM UTC coverage: 88.703% (+0.04%) from 88.664%
18214982978

Pull #422

github

web-flow
Merge fe026872f into 592d0cd98
Pull Request #422: ruff fixes

53 of 73 new or added lines in 16 files covered. (72.6%)

2 existing lines in 2 files now uncovered.

7852 of 8852 relevant lines covered (88.7%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

57.32
/ema_workbench/util/utilities.py
1
"""Convenience functions and classes used throughout the package."""
2

3
import configparser
1✔
4
import json
1✔
5
import os
1✔
6
import tarfile
1✔
7
from io import BytesIO
1✔
8

9
import numpy as np
1✔
10
import pandas as pd
1✔
11

12
from . import EMAError, get_module_logger
1✔
13

14
# Created on 13 jan. 2011
15
#
16
# .. codeauthor:: jhkwakkel <j.h.kwakkel (at) tudelft (dot) nl>
17

18
__all__ = ["load_results", "merge_results", "process_replications", "save_results"]
1✔
19
_logger = get_module_logger(__name__)
1✔
20

21

22
def load_results(file_name):
1✔
23
    """Load the specified tar.gz file.
24

25
    the file is assumed to be saves using save_results.
26

27
    Parameters
28
    ----------
29
    file_name : str
30
                the path to the file
31

32
    Raises
33
    ------
34
    IOError if file not found
35

36
    """
37
    from ..em_framework.outcomes import AbstractOutcome, register  # noqa: PLC0415
1✔
38

39
    file_name = os.path.abspath(file_name)
1✔
40

41
    with tarfile.open(file_name, "r:gz", encoding="UTF8") as archive:
1✔
42
        try:
1✔
43
            f = archive.extractfile("metadata.json")
1✔
44
        except KeyError:
×
45
            # old style data file
46
            results = load_results_old(archive)
×
47
            _logger.info(f"results loaded successfully from {file_name}")
×
48
            return results
×
49

50
        metadata = json.loads(f.read().decode())
1✔
51

52
        # load experiments
53
        f = archive.extractfile("experiments.csv")
1✔
54
        experiments = pd.read_csv(f)
1✔
55

56
        for name, dtype in metadata["experiments"].items():
1✔
57
            try:
1✔
58
                dtype = np.dtype(dtype)  # noqa: PLW2901
1✔
59
            except TypeError:
1✔
60
                dtype = pd.api.types.pandas_dtype(dtype)  # noqa: PLW2901
1✔
61

62
            if experiments[name].dtype is not dtype:
1✔
63
                experiments[name] = experiments[name].astype(dtype)
1✔
64
            # this check is for backward compatability with data stored with 2.4.
65
            if pd.api.types.is_object_dtype(dtype):
1✔
66
                experiments[name] = experiments[name].astype("category")
×
67

68
        # load outcomes
69
        outcomes = {}
1✔
70
        known_outcome_classes = {
1✔
71
            entry.__name__: entry for entry in AbstractOutcome.get_subclasses()
72
        }
73
        for outcome_type, name, filename in metadata["outcomes"]:
1✔
74
            outcome = known_outcome_classes[outcome_type](name)  #  noqa F841
1✔
75

76
            values = register.deserialize(name, filename, archive)
1✔
77
            outcomes[name] = values
1✔
78

79
    _logger.info(f"results loaded successfully from {file_name}")
1✔
80
    return experiments, outcomes
1✔
81

82

83
def load_results_old(archive):
1✔
84
    """Load the specified bz2 file.
85

86
    The file is assumed to be saves using save_results.
87

88
    Parameters
89
    ----------
90
    file_name : TarFile
91

92
    Raises
93
    ------
94
    IOError if file not found
95

96
    """
NEW
97
    from ..em_framework.outcomes import ArrayOutcome, ScalarOutcome  # noqa: PLC0415
×
98

99
    outcomes = {}
×
100

101
    # load x
102
    experiments = archive.extractfile("experiments.csv")
×
103
    if not (hasattr(experiments, "read")):
×
104
        raise EMAError(repr(experiments))
×
105

106
    experiments = pd.read_csv(experiments)
×
107

108
    # load experiment metadata
109
    metadata = archive.extractfile("experiments metadata.csv").readlines()
×
110

111
    for entry in metadata:
×
112
        entry = entry.decode("UTF-8")  # noqa: PLW2901
×
113
        entry = entry.strip()  # noqa: PLW2901
×
114
        entry = entry.split(",")  # noqa: PLW2901
×
115
        name, dtype = (str(item) for item in entry)
×
116

117
        try:
×
118
            dtype = np.dtype(dtype)
×
119
        except TypeError:
×
120
            dtype = pd.api.types.pandas_dtype(dtype)
×
121

122
        if pd.api.types.is_object_dtype(dtype):
×
123
            experiments[name] = experiments[name].astype("category")
×
124

125
    # load outcome metadata
126
    metadata = archive.extractfile("outcomes metadata.csv").readlines()
×
127
    metadata = [entry.decode("UTF-8") for entry in metadata]
×
128
    metadata = [entry.strip() for entry in metadata]
×
129
    metadata = [tuple(entry.split(",")) for entry in metadata]
×
130
    metadata = {entry[0]: entry[1:] for entry in metadata}
×
131

132
    # load outcomes
133
    for outcome, shape in metadata.items():
×
134
        shape = list(shape)  # noqa: PLW2901
×
135
        shape[0] = shape[0][1:]
×
136
        shape[-1] = shape[-1][0:-1]
×
137

138
        temp_shape = []
×
139
        for entry in shape:
×
140
            if entry:
×
141
                try:
×
142
                    temp_shape.append(int(entry))
×
143
                except ValueError:
×
144
                    temp_shape.append(int(entry[0:-1]))
×
145
        shape = tuple(temp_shape)  # noqa: PLW2901
×
146

147
        if len(shape) > 2:
×
148
            nr_files = shape[-1]
×
149

150
            data = np.empty(shape)
×
151
            for i in range(nr_files):
×
152
                values = archive.extractfile(f"{outcome}_{i}.csv")
×
153
                values = pd.read_csv(values, index_col=False, header=None).values
×
154
                data[:, :, i] = values
×
155

156
        else:
157
            data = archive.extractfile(f"{outcome}.csv")
×
158
            data = pd.read_csv(data, index_col=False, header=None).values
×
159
            data = np.reshape(data, shape)
×
160

161
        outcomes[outcome] = data
×
162

163
    # reformat outcomes from generic dict to new style OutcomesDict
164
    outcomes_new = {}
×
165
    for k, v in outcomes.items():
×
166
        outcome = ScalarOutcome(k) if v.ndim == 1 else ArrayOutcome(k)
×
167

168
        outcomes_new[outcome.name] = v
×
169

170
    return experiments, outcomes_new
×
171

172

173
def save_results(results, file_name):
1✔
174
    """Save the results to the specified tar.gz file.
175

176
    The way in which results are stored depends. Experiments are saved
177
    as csv. Outcomes depend on the outcome type. Scalar, and <3D arrays are
178
    saved as csv files. Higher dimensional arrays are stored as .npy files.
179

180
    Parameters
181
    ----------
182
    results : tuple
183
              the return of perform_experiments
184
    file_name : str
185
                the path of the file
186

187
    Raises
188
    ------
189
    IOError if file not found
190

191
    """
192
    from ..em_framework.outcomes import register  # noqa: PLC0415
1✔
193

194
    VERSION = 0.1  # noqa: N806
1✔
195
    file_name = os.path.abspath(file_name)
1✔
196

197
    def add_file(tararchive, stream, filename):
1✔
198
        stream.seek(0)
1✔
199
        tarinfo = tarfile.TarInfo(filename)
1✔
200
        tarinfo.size = len(stream.getbuffer())
1✔
201
        tararchive.addfile(tarinfo, stream)
1✔
202

203
    experiments, outcomes = results
1✔
204
    with tarfile.open(file_name, "w:gz") as z:
1✔
205
        # store experiments
206
        stream = BytesIO()
1✔
207
        stream.write(
1✔
208
            experiments.to_csv(header=True, encoding="UTF-8", index=False).encode()
209
        )
210
        add_file(z, stream, "experiments.csv")
1✔
211

212
        # store outcomes
213
        outcomes_metadata = []
1✔
214
        for key, value in outcomes.items():
1✔
215
            klass = register.outcomes[key]
1✔
216
            stream, filename = register.serialize(key, value)
1✔
217
            add_file(z, stream, filename)
1✔
218
            outcomes_metadata.append((klass.__name__, key, filename))
1✔
219

220
        # store metadata
221
        metadata = {
1✔
222
            "version": VERSION,
223
            "outcomes": outcomes_metadata,
224
            "experiments": {k: v.name for k, v in experiments.dtypes.to_dict().items()},
225
        }
226

227
        stream = BytesIO()
1✔
228
        stream.write(json.dumps(metadata).encode())
1✔
229
        add_file(z, stream, "metadata.json")
1✔
230

231
    _logger.info(f"results saved successfully to {file_name}")
1✔
232

233

234
def merge_results(results1, results2):
1✔
235
    """Convenience function for merging results from the workbench.
236

237
    The function merges results2 with results1. For the experiments,
238
    it generates an empty array equal to the size of the sum of the
239
    experiments. As dtype is uses the dtype from the experiments in results1.
240
    The function assumes that the ordering of dtypes and names is identical in
241
    both results.
242

243
    A typical use case for this function is in combination with
244
    :func:`~util.experiments_to_cases`. Using :func:`~util.experiments_to_cases`
245
    one extracts the cases from a first set of experiments. One then
246
    performs these cases on a different model or policy, and then one wants to
247
    merge these new results with the old result for further analysis.
248

249
    Parameters
250
    ----------
251
    results1 : tuple
252
               first results to be merged
253
    results2 : tuple
254
               second results to be merged
255

256
    Returns
257
    -------
258
    the merged results
259

260

261
    """
262
    # start of merging
263
    exp1, res1 = results1
1✔
264
    exp2, res2 = results2
1✔
265

266
    # merge x
267
    merged_exp = pd.concat([exp1, exp2], axis=0)
1✔
268
    merged_exp.reset_index(drop=True, inplace=True)
1✔
269

270
    # only merge the results that are in both
271
    keys = set(res1.keys()).intersection(set(res2.keys()))
1✔
272
    _logger.info(f"intersection of keys: {keys}")
1✔
273

274
    # merging results
275
    merged_res = {}
1✔
276
    for key in keys:
1✔
277
        _logger.info(f"merge {key}")
1✔
278

279
        value1 = res1.get(key)
1✔
280
        value2 = res2.get(key)
1✔
281
        merged_value = np.concatenate([value1, value2])
1✔
282
        merged_res[key] = merged_value
1✔
283

284
    mr = (merged_exp, merged_res)
1✔
285
    return mr
1✔
286

287

288
def get_ema_project_home_dir():
1✔
289
    try:
1✔
290
        config_file_name = "expworkbench.cfg"
1✔
291
        directory = os.path.dirname(__file__)
1✔
292
        fn = os.path.join(directory, config_file_name)
1✔
293

294
        config = configparser.ConfigParser()
1✔
295
        parsed = config.read(fn)
1✔
296

297
        if parsed:
1✔
298
            _logger.info(f"config loaded from {parsed[0]}")
×
299
        else:
300
            _logger.info("no config file found")
1✔
301

302
        home_dir = config.get("ema_project_home", "home_dir")
1✔
303
        return home_dir
×
304
    except BaseException:
1✔
305
        return os.getcwd()
1✔
306

307

308
def process_replications(data, aggregation_func=np.mean):
1✔
309
    """Convenience function for processing the replications of a stochastic model outcomes.
310

311
    The default behavior is to take the mean of the replications. This reduces
312
    the dimensionality of the outcomes from
313
    (experiments * replications * outcome_shape) to
314
    (experiments * outcome_shape), where outcome_shape is 0-d for scalars,
315
    1-d for time series, and 2-d for arrays.
316

317
    The function can take either the outcomes (dictionary: keys are outcomes
318
    of interest, values are arrays of data) or the results (tuple: experiments
319
    as DataFrame, outcomes as dictionary) of a set of simulation experiments.
320

321
    Parameters
322
    ----------
323
    data : dict, tuple
324
        outcomes or results of a set of experiments
325
    aggregation_func : callabale, optional
326
        aggregation function to be applied, defaults to np.mean.
327

328
    Returns
329
    -------
330
    dict, tuple
331

332

333
    """
334
    if isinstance(data, dict):
×
335
        # replications are the second dimension of the outcome arrays
336
        outcomes_processed = {key: aggregation_func(data[key], axis=1) for key in data}
×
337
        return outcomes_processed
×
338
    elif (
×
339
        isinstance(data, tuple)
340
        and isinstance(data[0], pd.DataFrame)
341
        and isinstance(data[1], dict)
342
    ):
343
        experiments, outcomes = data  # split results
×
344
        outcomes_processed = {
×
345
            key: aggregation_func(outcomes[key], axis=1) for key in outcomes
346
        }
347
        results_processed = (experiments.copy(deep=True), outcomes_processed)
×
348
        return results_processed
×
349

350
    else:
351
        raise EMAError(
×
352
            f"data should be a dict or tuple, but is a {type(data)}".format()
353
        )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc