• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

LCA-ActivityBrowser / activity-browser / 10319203045

09 Aug 2024 12:27PM UTC coverage: 54.132% (-0.3%) from 54.397%
10319203045

push

github

web-flow
Merge pull request #1336 from mrvisscher/its_the_final_logger

Logging changes

136 of 164 new or added lines in 50 files covered. (82.93%)

68 existing lines in 2 files now uncovered.

8280 of 15296 relevant lines covered (54.13%)

0.54 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

17.37
/activity_browser/bwutils/sensitivity_analysis.py
1
# -*- coding: utf-8 -*-
2

3
# =============================================================================
4
# Global Sensitivity Analysis (GSA) functions and class for the Delta
5
# Moment-Independent measure based on Monte Carlo simulation LCA results.
6
# see: https://salib.readthedocs.io/en/latest/api.html#delta-moment-independent-measure
7
# =============================================================================
8
import os
1✔
9
import traceback
1✔
10
from time import time
1✔
11
from logging import getLogger
1✔
12

13
import bw2calc as bc
1✔
14
import numpy as np
1✔
15
import pandas as pd
1✔
16
from SALib.analyze import delta
1✔
17

18
from activity_browser.mod import bw2data as bd
1✔
19

20
from ..settings import ab_settings
1✔
21
from .montecarlo import MonteCarloLCA, perform_MonteCarlo_LCA
1✔
22

23
try:
1✔
24
    # attempt bw25 import
25
    from bw2calc.graph_traversal import \
1✔
26
        AssumedDiagonalGraphTraversal as GraphTraversal
27
except ImportError:
1✔
28
    # standard import on failure
29
    from bw2calc import GraphTraversal
1✔
30

31
log = getLogger(__name__)
1✔
32

33

34
def get_lca(fu, method):
1✔
35
    """Calculates a non-stochastic LCA and returns a the LCA object."""
36
    lca = bc.LCA(fu, method=method)
×
37
    lca.lci()
×
38
    lca.lcia()
×
NEW
39
    log.info(f"Non-stochastic LCA score: {lca.score}")
×
40

41
    # add reverse dictionaries
42
    lca.activity_dict_rev, lca.product_dict_rev, lca.biosphere_dict_rev = (
×
43
        lca.reverse_dict()
44
    )
45

46
    return lca
×
47

48

49
def filter_technosphere_exchanges(fu, method, cutoff=0.05, max_calc=1e4):
1✔
50
    """Use brightway's GraphTraversal to identify the relevant
51
    technosphere exchanges in a non-stochastic LCA."""
52
    start = time()
×
53
    res = GraphTraversal().calculate(fu, method, cutoff=cutoff, max_calc=max_calc)
×
54

55
    # get all edges
56
    technosphere_exchange_indices = []
×
57
    for e in res["edges"]:
×
58
        if e["to"] != -1:  # filter out head introduced in graph traversal
×
59
            technosphere_exchange_indices.append((e["from"], e["to"]))
×
60
    log.info(
×
61
        "TECHNOSPHERE {} filtering resulted in {} of {} exchanges and took {} iterations in {} seconds.".format(
62
            res["lca"].technosphere_matrix.shape,
63
            len(technosphere_exchange_indices),
64
            res["lca"].technosphere_matrix.getnnz(),
65
            res["counter"],
66
            np.round(time() - start, 2),
67
        )
68
    )
69
    return technosphere_exchange_indices
×
70

71

72
def filter_biosphere_exchanges(lca, cutoff=0.005):
1✔
73
    """Reduce biosphere exchanges to those that matter for a given impact
74
    category in a non-stochastic LCA."""
75
    start = time()
×
76

77
    inv = lca.characterized_inventory
×
78
    finv = inv.multiply(abs(inv) > abs(lca.score / (1 / cutoff)))
×
79
    biosphere_exchange_indices = list(zip(*finv.nonzero()))
×
80
    explained_fraction = finv.sum() / lca.score
×
81
    log.info(
×
82
        "BIOSPHERE {} filtering resulted in {} of {} exchanges ({}% of total impact) and took {} seconds.".format(
83
            inv.shape,
84
            finv.nnz,
85
            inv.nnz,
86
            np.round(explained_fraction * 100, 2),
87
            np.round(time() - start, 2),
88
        )
89
    )
90
    return biosphere_exchange_indices
×
91

92

93
def get_exchanges(lca, indices, biosphere=False, only_uncertain=True):
1✔
94
    """Get actual exchange objects from indices.
95
    By default get only exchanges that have uncertainties.
96

97
    Returns
98
    -------
99
    exchanges : list
100
        List of exchange objects
101
    indices : list of tuples
102
        List of indices
103
    """
104
    exchanges = list()
×
105
    for i in indices:
×
106
        if biosphere:
×
107
            from_act = bd.get_activity(lca.biosphere_dict_rev[i[0]])
×
108
        else:  # technosphere
109
            from_act = bd.get_activity(lca.activity_dict_rev[i[0]])
×
110
        to_act = bd.get_activity(lca.activity_dict_rev[i[1]])
×
111

112
        for exc in to_act.exchanges():
×
113
            if exc.input == from_act.key:
×
114
                exchanges.append(exc)
×
115
                # continue  # if there was always only one max exchange between two activities
116

117
    # in theory there should be as many exchanges as indices, but since
118
    # multiple exchanges are possible between two activities, the number of
119
    # exchanges must be at least equal or higher to the number of indices
120
    if len(exchanges) < len(
×
121
        indices
122
    ):  # must have at least as many exchanges as indices (assu)
123
        raise ValueError(
×
124
            "Error: mismatch between indices provided ({}) and Exchanges received ({}).".format(
125
                len(indices), len(exchanges)
126
            )
127
        )
128

129
    # by default drop exchanges and indices if the have no uncertainties
130
    if only_uncertain:
×
131
        exchanges, indices = drop_no_uncertainty_exchanges(exchanges, indices)
×
132

133
    return exchanges, indices
×
134

135

136
def drop_no_uncertainty_exchanges(excs, indices):
1✔
137
    excs_no = list()
×
138
    indices_no = list()
×
139
    for exc, ind in zip(excs, indices):
×
140
        if exc.get("uncertainty type") and exc.get("uncertainty type") >= 1:
×
141
            excs_no.append(exc)
×
142
            indices_no.append(ind)
×
143
    log.info(
×
144
        "Dropping {} exchanges of {} with no uncertainty. {} remaining.".format(
145
            len(excs) - len(excs_no), len(excs), len(excs_no)
146
        )
147
    )
148
    return excs_no, indices_no
×
149

150

151
def get_exchanges_dataframe(exchanges, indices, biosphere=False):
1✔
152
    """Returns a Dataframe from the exchange data and a bit of additional information."""
153

154
    for exc, i in zip(exchanges, indices):
×
155
        from_act = bd.get_activity(exc.get("input"))
×
156
        to_act = bd.get_activity(exc.get("output"))
×
157

158
        exc.update(
×
159
            {
160
                "index": i,
161
                "from name": from_act.get("name", np.nan),
162
                "from location": from_act.get("location", np.nan),
163
                "to name": to_act.get("name", np.nan),
164
                "to location": to_act.get("location", np.nan),
165
            }
166
        )
167

168
        # GSA name (needs to yield unique labels!)
169
        if biosphere:
×
170
            exc.update(
×
171
                {
172
                    "GSA name": "B: {} // {} ({}) [{}]".format(
173
                        from_act.get("name", ""),
174
                        to_act.get("name", ""),
175
                        to_act.get("reference product", ""),
176
                        to_act.get("location", ""),
177
                    )
178
                }
179
            )
180
        else:
181
            exc.update(
×
182
                {
183
                    "GSA name": "T: {} FROM {} [{}] TO {} ({}) [{}]".format(
184
                        from_act.get("reference product", ""),
185
                        from_act.get("name", ""),
186
                        from_act.get("location", ""),
187
                        to_act.get("name", ""),
188
                        to_act.get("reference product", ""),
189
                        to_act.get("location", ""),
190
                    )
191
                }
192
            )
193

194
    return pd.DataFrame(exchanges)
×
195

196

197
def get_CF_dataframe(lca, only_uncertain_CFs=True):
1✔
198
    """Returns a dataframe with the metadata for the characterization factors
199
    (in the biosphere matrix). Filters non-stochastic CFs if desired (default)."""
200
    data = dict()
×
201
    for params_index, row in enumerate(lca.cf_params):
×
202
        if only_uncertain_CFs and row["uncertainty_type"] <= 1:
×
203
            continue
×
204
        cf_index = row["row"]
×
205
        bio_act = bd.get_activity(lca.biosphere_dict_rev[cf_index])
×
206

207
        data.update({params_index: bio_act.as_dict()})
×
208

209
        for name in row.dtype.names:
×
210
            data[params_index][name] = row[name]
×
211

212
        data[params_index]["index"] = cf_index
×
213
        data[params_index]["GSA name"] = (
×
214
            "CF: " + bio_act["name"] + str(bio_act["categories"])
215
        )
216

217
    log.info(
×
218
        "CHARACTERIZATION FACTORS filtering resulted in including {} of {} characteriation factors.".format(
219
            len(data),
220
            len(lca.cf_params),
221
        )
222
    )
223
    df = pd.DataFrame(data).T
×
224
    df.rename(columns={"uncertainty_type": "uncertainty type"}, inplace=True)
×
225
    return df
×
226

227

228
def get_parameters_DF(mc):
1✔
229
    """Function to make parameters dataframe"""
230
    if bool(mc.parameter_data):  # returns False if dict is empty
×
231
        dfp = pd.DataFrame(mc.parameter_data).T
×
232
        dfp["GSA name"] = "P: " + dfp["name"]
×
NEW
233
        log.info(f"PARAMETERS: {len(dfp)}")
×
234
        return dfp
×
235
    else:
236
        log.info("PARAMETERS: None included.")
×
237
        return pd.DataFrame()  # return emtpy df
×
238

239

240
def get_exchange_values(matrix, indices):
1✔
241
    """Get technosphere exchanges values from a list of exchanges
242
    (row and column information)"""
243
    return [matrix[i] for i in indices]
×
244

245

246
def get_X(matrix_list, indices):
1✔
247
    """Get the input data to the GSA, i.e. A and B matrix values for each
248
    model run."""
249
    X = np.zeros((len(matrix_list), len(indices)))
×
250
    for row, M in enumerate(matrix_list):
×
251
        X[row, :] = get_exchange_values(M, indices)
×
252
    return X
×
253

254

255
def get_X_CF(mc, dfcf, method):
1✔
256
    """Get the characterization factors used for each model run. Only those CFs
257
    that are in the dfcf dataframe will be returned (i.e. by default only the
258
    CFs that have uncertainties."""
259
    # get all CF inputs
260
    CF_data = np.array(mc.CF_dict[method])  # has the same shape as the Xa and Xb below
×
261

262
    # reduce this to uncertain CFs only (if this was done for the dfcf)
263
    params_indices = dfcf.index.values
×
264

265
    # if params_indices:
266
    return CF_data[:, params_indices]
×
267

268

269
def get_X_P(dfp):
1✔
270
    """Get the parameter values for each model run"""
271
    lists = [d for d in dfp["values"]]
×
272
    return list(zip(*lists))
×
273

274

275
def get_problem(X, names):
1✔
276
    return {
×
277
        "num_vars": X.shape[1],
278
        "names": names,
279
        "bounds": list(zip(*(np.amin(X, axis=0), np.amax(X, axis=0)))),
280
    }
281

282

283
class GlobalSensitivityAnalysis(object):
1✔
284
    """Class for Global Sensitivity Analysis.
285
    For now Delta Moment Independent Measure based on:
286
    https://salib.readthedocs.io/en/latest/api.html#delta-moment-independent-measure
287
    Builds on top of Monte Carlo Simulation results.
288
    """
289

290
    def __init__(self, mc):
1✔
291
        self.update_mc(mc)
×
292
        self.act_number = int()
×
293
        self.method_number = int()
×
294
        self.cutoff_technosphere = float()
×
295
        self.cutoff_biosphere = float()
×
296

297
    def update_mc(self, mc):
1✔
298
        "Update the Monte Carlo Simulation object (and results)."
299
        try:
×
300
            assert isinstance(mc, MonteCarloLCA)
×
301
            self.mc = mc
×
302
        except AssertionError:
×
303
            raise AssertionError(
×
304
                "mc should be an instance of MonteCarloLCA, but instead it is a {}.".format(
305
                    type(mc)
306
                )
307
            )
308

309
    def perform_GSA(
1✔
310
        self,
311
        act_number=0,
312
        method_number=0,
313
        cutoff_technosphere=0.01,
314
        cutoff_biosphere=0.01,
315
    ):
316
        """Perform GSA for specific reference flow and impact category."""
317
        start = time()
×
318

319
        # set FU and method
320
        try:
×
321
            self.act_number = act_number
×
322
            self.method_number = method_number
×
323
            self.cutoff_technosphere = cutoff_technosphere
×
324
            self.cutoff_biosphere = cutoff_biosphere
×
325

326
            self.fu = self.mc.cs["inv"][act_number]
×
327
            self.activity = bd.get_activity(self.mc.rev_activity_index[act_number])
×
328
            self.method = self.mc.cs["ia"][method_number]
×
329

330
        except Exception as e:
×
331
            traceback.print_exc()
×
332
            # todo: QMessageBox.warning(self, 'Could not perform Delta analysis', str(e))
333
            log.error("Initializing the GSA failed.")
×
334
            return None
×
335

336
        log.info(
×
337
            f"-- GSA --\n Project: {bd.projects.current} CS: {self.mc.cs_name} "
338
            f"Activity: {self.activity} Method: {self.method}",
339
        )
340

341
        # get non-stochastic LCA object with reverse dictionaries
342
        self.lca = get_lca(self.fu, self.method)
×
343

344
        # =============================================================================
345
        #   Filter exchanges and get metadata DataFrames
346
        # =============================================================================
347
        dfs = []
×
348
        # technosphere
349
        if self.mc.include_technosphere:
×
350
            self.t_indices = filter_technosphere_exchanges(
×
351
                self.fu, self.method, cutoff=cutoff_technosphere, max_calc=1e4
352
            )
353
            self.t_exchanges, self.t_indices = get_exchanges(self.lca, self.t_indices)
×
354
            self.dft = get_exchanges_dataframe(self.t_exchanges, self.t_indices)
×
355
            if not self.dft.empty:
×
356
                dfs.append(self.dft)
×
357

358
        # biosphere
359
        if self.mc.include_biosphere:
×
360
            self.b_indices = filter_biosphere_exchanges(
×
361
                self.lca, cutoff=cutoff_biosphere
362
            )
363
            self.b_exchanges, self.b_indices = get_exchanges(
×
364
                self.lca, self.b_indices, biosphere=True
365
            )
366
            self.dfb = get_exchanges_dataframe(
×
367
                self.b_exchanges, self.b_indices, biosphere=True
368
            )
369
            if not self.dfb.empty:
×
370
                dfs.append(self.dfb)
×
371

372
        # characterization factors
373
        if self.mc.include_cfs:
×
374
            self.dfcf = get_CF_dataframe(
×
375
                self.lca, only_uncertain_CFs=True
376
            )  # None if no stochastic CFs
377
            if not self.dfcf.empty:
×
378
                dfs.append(self.dfcf)
×
379

380
        # parameters
381
        # todo: if parameters, include df, but remove exchanges from T and B (skipped for now)
382
        self.dfp = get_parameters_DF(self.mc)  # Empty df if no parameters
×
383
        if not self.dfp.empty:
×
384
            dfs.append(self.dfp)
×
385

386
        # Join dataframes to get metadata
387
        self.metadata = pd.concat(dfs, axis=0, ignore_index=True, sort=False)
×
388
        self.metadata.set_index("GSA name", inplace=True)
×
389

390
        # =============================================================================
391
        #     GSA
392
        # =============================================================================
393

394
        # Get X (Technosphere, Biosphere and CF values)
395
        X_list = list()
×
396
        if self.mc.include_technosphere and self.t_indices:
×
397
            self.Xa = get_X(self.mc.A_matrices, self.t_indices)
×
398
            X_list.append(self.Xa)
×
399
        if self.mc.include_biosphere and self.b_indices:
×
400
            self.Xb = get_X(self.mc.B_matrices, self.b_indices)
×
401
            X_list.append(self.Xb)
×
402
        if self.mc.include_cfs and not self.dfcf.empty:
×
403
            self.Xc = get_X_CF(self.mc, self.dfcf, self.method)
×
404
            X_list.append(self.Xc)
×
405
        if self.mc.include_parameters and not self.dfp.empty:
×
406
            self.Xp = get_X_P(self.dfp)
×
407
            X_list.append(self.Xp)
×
408

409
        self.X = np.concatenate(X_list, axis=1)
×
410
        # print('X', self.X.shape)
411

412
        # Get Y (LCA scores)
413
        self.Y = self.mc.get_results_dataframe(act_key=self.activity.key)[
×
414
            self.method
415
        ].to_numpy()
416

417
        # log-transformation. This makes it more robust for very uneven distributions of LCA results (e.g. toxicity related impacts).
418
        # Can only be applied if all Monte-Carlo LCA scores are either positive or negative.
419
        # Should not be used when LCA scores overlap zero (sometimes positive and sometimes negative)
420
        # if np.all(self.Y > 0) if self.Y[0] > 0 else np.all(self.Y < 0):  # check if all LCA scores are of the same sign
421
        #     self.Y = np.log(np.abs(self.Y))  # this makes it more robust for very uneven distributions of LCA results
422
        if np.all(self.Y > 0):  # all positive numbers
×
423
            self.Y = np.log(np.abs(self.Y))
×
424
            log.info("All positive LCA scores. Log-transformation performed.")
×
425
        elif np.all(self.Y < 0):  # all negative numbers
×
426
            self.Y = -np.log(np.abs(self.Y))
×
427
            log.info("All negative LCA scores. Log-transformation performed.")
×
428
        else:  # mixed positive and negative numbers
429
            log.warning(
×
430
                "Log-transformation cannot be applied as LCA scores overlap zero."
431
            )
432

433
        # print('Filtering took {} seconds'.format(np.round(time() - start, 2)))
434

435
        # define problem
436
        self.names = self.metadata.index  # ['GSA name']
×
437
        # print('Names:', len(self.names))
438
        self.problem = get_problem(self.X, self.names)
×
439

440
        # perform delta analysis
441
        time_delta = time()
×
442
        self.Si = delta.analyze(self.problem, self.X, self.Y, print_to_console=False)
×
443
        log.info(
×
444
            "Delta analysis took {} seconds".format(
445
                np.round(time() - time_delta, 2),
446
            )
447
        )
448

449
        # put GSA results in to dataframe
450
        self.dfgsa = pd.DataFrame(self.Si, index=self.names).sort_values(
×
451
            by="delta", ascending=False
452
        )
453
        self.dfgsa.index.names = ["GSA name"]
×
454

455
        # join with metadata
456
        self.df_final = self.dfgsa.join(self.metadata, on="GSA name")
×
457
        self.df_final.reset_index(inplace=True)
×
458
        self.df_final["pedigree"] = [str(x) for x in self.df_final["pedigree"]]
×
459

460
        log.info("GSA took {} seconds".format(np.round(time() - start, 2)))
×
461

462
    def get_save_name(self):
1✔
463
        save_name = (
×
464
            self.mc.cs_name
465
            + "_"
466
            + str(self.mc.iterations)
467
            + "_"
468
            + self.activity["name"]
469
            + "_"
470
            + str(self.method)
471
            + ".xlsx"
472
        )
473
        save_name = save_name.replace(",", "").replace("'", "").replace("/", "")
×
474
        return save_name
×
475

476
    def export_GSA_output(self):
1✔
477
        save_name = "gsa_output_" + self.get_save_name()
×
478
        self.df_final.to_excel(os.path.join(ab_settings.data_dir, save_name))
×
479

480
    def export_GSA_input(self):
1✔
481
        """Export the input data to the GSA with a human readible index"""
482
        X_with_index = pd.DataFrame(self.X.T, index=self.metadata.index)
×
483
        save_name = "gsa_input_" + self.get_save_name()
×
484
        X_with_index.to_excel(os.path.join(ab_settings.data_dir, save_name))
×
485

486

487
if __name__ == "__main__":
1✔
488
    mc = perform_MonteCarlo_LCA(project="ei34", cs_name="kraft paper", iterations=20)
×
489
    g = GlobalSensitivityAnalysis(mc)
×
490
    g.perform_GSA(
×
491
        act_number=0, method_number=1, cutoff_technosphere=0.01, cutoff_biosphere=0.01
492
    )
493
    g.export()
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc