• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

LCA-ActivityBrowser / activity-browser / 12769607583

14 Jan 2025 02:20PM UTC coverage: 53.274% (-0.04%) from 53.311%
12769607583

Pull #1430

github

web-flow
Merge b66c83f81 into 1e0231426
Pull Request #1430: fix wrong logic for `range` CA total

1 of 38 new or added lines in 3 files covered. (2.63%)

4 existing lines in 1 file now uncovered.

8372 of 15715 relevant lines covered (53.27%)

0.53 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

20.58
/activity_browser/bwutils/multilca.py
1
from collections import OrderedDict
1✔
2
from copy import deepcopy
1✔
3
from typing import Iterable, Optional, Union
1✔
4
from logging import getLogger
1✔
5

6
import bw2analyzer as ba
1✔
7
import bw2calc as bc
1✔
8
import numpy as np
1✔
9
import pandas as pd
1✔
10
from PySide2.QtWidgets import QApplication, QMessageBox
1✔
11

12
from activity_browser.mod import bw2data as bd
1✔
13

14
from .commontasks import wrap_text
1✔
15
from .errors import ReferenceFlowValueError
1✔
16
from .metadata import AB_metadata
1✔
17

18
log = getLogger(__name__)
1✔
19
ca = ba.ContributionAnalysis()
1✔
20

21

22
class MLCA(object):
1✔
23
    """Wrapper class for performing LCA calculations with many reference flows and impact categories.
24

25
    Needs to be passed a brightway ``calculation_setup`` name.
26

27
    This class does not subclass the `LCA` class, and performs all
28
    calculations upon instantiation.
29

30
    Initialization creates `self.lca_scores`, which is a NumPy array
31
    of LCA scores, with rows of reference flows and columns of impact categories.
32
    Ordering is the same as in the `calculation_setup`.
33

34
    This class is adapted from `bw2calc.multi_lca.MultiLCA` and includes a
35
    number of additional attributes required to perform process- and
36
    elementary flow contribution analysis (see class `Contributions` below).
37

38
    Parameters
39
    ----------
40
    cs_name : str
41
        Name of the calculation setup
42

43
    Attributes
44
    ----------
45
    func_units_dict
46
    all_databases
47
    lca_scores_normalized
48
    func_units: list
49
        List of dictionaries, each containing the reference flow key and
50
        its required output
51
    fu_activity_keys: list
52
        The reference flow keys
53
    fu_index: dict
54
        Links the reference flows to a specific index
55
    rev_fu_index: dict
56
        Same as `fu_index` but using the indexes as keys
57
    methods: list
58
        The impact categories of the calculation setup
59
    method_index: dict
60
        Links the impact categories to a specific index
61
    rev_method_index: dict
62
        Same as `method_index` but using the indexes as keys
63
    lca: `bw2calc.lca.LCA`
64
        Brightway LCA instance used to perform LCA, LCI and LCIA
65
        calculations
66
    method_matrices: list
67
        Contains the characterization matrix for each impact category.
68
    lca_scores: `numpy.ndarray`
69
        2-dimensional array of shape (`func_units`, `methods`) holding the
70
        calculated LCA scores of each combination of reference flow and
71
        impact assessment method
72
    rev_activity_dict: dict
73
        See `bw2calc.lca.LCA.reverse_dict`
74
    rev_product_dict: dict
75
        See `bw2calc.lca.LCA.reverse_dict`
76
    rev_biosphere_dict: dict
77
        See `bw2calc.lca.LCA.reverse_dict`
78
    scaling_factors: dict
79
        Contains the life-cycle inventory scaling factors per reference flow
80
    technosphere_flows: dict
81
        Contains the calculated technosphere flows per reference flow
82
    inventory: dict
83
        Life cycle inventory (biosphere flows) per reference flow
84
    inventories: dict
85
        Biosphere flows per reference flow and impact category combination
86
    characterized_inventories: dict
87
        Inventory multiplied by scaling (relative impact on environment) per
88
        reference flow and impact category combination
89
    elementary_flow_contributions: `numpy.ndarray`
90
        3-dimensional array of shape (`func_units`, `methods`, `biosphere`)
91
        which holds the characterized inventory results summed along the
92
        technosphere axis
93
    process_contributions: `numpy.ndarray`
94
        3-dimensional array of shape (`func_units`, `methods`, `technosphere`)
95
        which holds the characterized inventory results summed along the
96
        biosphere axis
97
    func_unit_translation_dict: dict
98
        Contains the reference flow key and its expected output linked to
99
        the brightway activity label.
100
    func_key_dict: dict
101
        An index of the brightway activity labels
102
    func_key_list: list
103
        A derivative of `func_key_dict` containing just the keys
104

105
    Raises
106
    ------
107
    ValueError
108
        If the given `cs_name` cannot be found in brightway calculation_setups
109

110
    """
111

112
    def __init__(self, cs_name: str):
1✔
113
        try:
×
114
            cs = bd.calculation_setups[cs_name]
×
115
        except KeyError:
×
116
            raise ValueError(f"{cs_name} is not a known `calculation_setup`.")
×
117

118
        # check if all values are non-zero
119
        # cs['inv'] contains all reference flows (rf),
120
        # all values of rf are the individual reference flow items.
121
        if [v for rf in cs["inv"] for v in rf.values() if v == 0]:
×
122
            msg = QMessageBox()
×
123
            msg.setWindowTitle("Reference flows equal 0")
×
124
            msg.setText("All reference flows must be non-zero.")
×
125
            msg.setInformativeText(
×
126
                "Please enter a valid value before calculating LCA results again."
127
            )
128
            msg.setIcon(QMessageBox.Warning)
×
129
            QApplication.restoreOverrideCursor()
×
130
            msg.exec_()
×
131
            raise ReferenceFlowValueError("Reference flow == 0")
×
132

133
        # reference flows and related indexes
134
        self.func_units = cs["inv"]
×
135
        self.fu_activity_keys = [list(fu.keys())[0] for fu in self.func_units]
×
136
        self.fu_index = {k: i for i, k in enumerate(self.fu_activity_keys)}
×
137
        self.rev_fu_index = {v: k for k, v in self.fu_index.items()}
×
138

139
        # Methods and related indexes
140
        self.methods = cs["ia"]
×
141
        self.method_index = {m: i for i, m in enumerate(self.methods)}
×
142
        self.rev_method_index = {v: k for k, v in self.method_index.items()}
×
143

144
        # initial LCA and prepare method matrices
145
        self.lca = self._construct_lca()
×
146
        self.lca.lci(factorize=True)
×
147
        self.method_matrices = []
×
148
        for method in self.methods:
×
149
            self.lca.switch_method(method)
×
150
            self.method_matrices.append(self.lca.characterization_matrix)
×
151

152
        self.lca_scores = np.zeros((len(self.func_units), len(self.methods)))
×
153

154
        # data to be stored
155
        (self.rev_activity_dict, self.rev_product_dict, self.rev_biosphere_dict) = (
×
156
            self.lca.reverse_dict()
157
        )
158

159
        # Scaling
160
        self.scaling_factors = dict()
×
161

162
        # Technosphere product flows for a given reference flow
163
        self.technosphere_flows = dict()
×
164
        # Life cycle inventory (biosphere flows) by reference flow
165
        self.inventory = dict()
×
166
        # Inventory (biosphere flows) for specific reference flow (e.g. 2000x15000) and impact category.
167
        self.inventories = dict()
×
168
        # Inventory multiplied by scaling (relative impact on environment) per impact category.
169
        self.characterized_inventories = dict()
×
170

171
        # Summarized contributions for EF and processes.
172
        self.elementary_flow_contributions = np.zeros(
×
173
            (
174
                len(self.func_units),
175
                len(self.methods),
176
                self.lca.biosphere_matrix.shape[0],
177
            )
178
        )
179
        self.process_contributions = np.zeros(
×
180
            (
181
                len(self.func_units),
182
                len(self.methods),
183
                self.lca.technosphere_matrix.shape[0],
184
            )
185
        )
186

187
        self.func_unit_translation_dict = {}
×
188
        for fu in self.func_units:
×
189
            key = next(iter(fu))
×
190
            amount = fu[key]
×
191
            act = bd.get_activity(key)
×
192
            self.func_unit_translation_dict[
×
193
                (
194
                    f'{act["name"]} | '
195
                    f'{act["reference product"]} | '
196
                    f'{act["location"]} | '
197
                    f'{act["database"]} | '
198
                    f"{amount}"
199
                )
200
            ] = fu
201
        self.func_key_dict = {
×
202
            m: i for i, m in enumerate(self.func_unit_translation_dict.keys())
203
        }
204
        self.func_key_list = list(self.func_unit_translation_dict.keys())
×
205

206
    def _construct_lca(self):
1✔
207
        return bc.LCA(demand=self.func_units_dict, method=self.methods[0])
×
208

209
    def _perform_calculations(self):
1✔
210
        """Isolates the code which performs calculations to allow subclasses
211
        to either alter the code or redo calculations after matrix substitution.
212
        """
213
        for row, func_unit in enumerate(self.func_units):
×
214
            # Do the LCA for the current reference flow
215
            try:
×
216
                self.lca.redo_lci(func_unit)
×
217
            except:
×
218
                # bw25 compatibility
219
                key = list(func_unit.keys())[0]
×
220
                self.lca.redo_lci({bd.get_activity(key).id: func_unit[key]})
×
221

222
            # Now update the:
223
            # - Scaling factors
224
            # - Technosphere flows
225
            # - Life cycle inventory
226
            # - Life-cycle inventory (disaggregated by contributing process)
227
            # for current reference flow
228
            self.scaling_factors.update({str(func_unit): self.lca.supply_array})
×
229
            self.technosphere_flows.update(
×
230
                {
231
                    str(func_unit): np.multiply(
232
                        self.lca.supply_array, self.lca.technosphere_matrix.diagonal()
233
                    )
234
                }
235
            )
236
            self.inventory.update(
×
237
                {str(func_unit): np.array(self.lca.inventory.sum(axis=1)).ravel()}
238
            )
239
            self.inventories.update({str(func_unit): self.lca.inventory})
×
240

241
            # Now, for each method, take the current reference flow and do inventory analysis
242
            for col, cf_matrix in enumerate(self.method_matrices):
×
243
                self.lca.characterization_matrix = cf_matrix
×
244
                self.lca.lcia_calculation()
×
245
                self.lca_scores[row, col] = self.lca.score
×
246
                self.characterized_inventories[row, col] = (
×
247
                    self.lca.characterized_inventory.copy()
248
                )
249
                self.elementary_flow_contributions[row, col] = np.array(
×
250
                    self.lca.characterized_inventory.sum(axis=1)
251
                ).ravel()
252
                self.process_contributions[row, col] = (
×
253
                    self.lca.characterized_inventory.sum(axis=0)
254
                )
255

256
    def calculate(self):
1✔
257
        self._perform_calculations()
×
258

259
    @property
1✔
260
    def func_units_dict(self) -> dict:
1✔
261
        """Return a dictionary of reference flow (key, demand)."""
262
        return {key: 1 for func_unit in self.func_units for key in func_unit}
×
263

264
    @property
1✔
265
    def all_databases(self) -> set:
1✔
266
        """Get all databases linked to the reference flows."""
267

268
        def get_dependents(dbs: set, dependents: list) -> set:
×
269
            for dep in (bd.databases[db].get("depends", []) for db in dependents):
×
270
                if not dbs.issuperset(dep):
×
271
                    dbs = get_dependents(dbs.union(dep), dep)
×
272
            return dbs
×
273

274
        dbs = set(f[0] for f in self.fu_activity_keys)
×
275
        dbs = get_dependents(dbs, list(dbs))
×
276
        # In rare cases, the default biosphere is not found as a dependency, see:
277
        # https://github.com/LCA-ActivityBrowser/activity-browser/issues/298
278
        # Always include it.
279
        dbs.add(bd.config.biosphere)
×
280
        return dbs
×
281

282
    def get_results_for_method(self, index: int = 0) -> pd.DataFrame:
1✔
283
        data = self.lca_scores[:, index]
×
284
        return pd.DataFrame(data, index=self.fu_activity_keys)
×
285

286
    @property
1✔
287
    def lca_scores_normalized(self) -> np.ndarray:
1✔
288
        """Normalize LCA scores by impact assessment method."""
289
        return self.lca_scores / self.lca_scores.max(axis=0)
×
290

291
    def get_normalized_scores_df(self) -> pd.DataFrame:
1✔
292
        """To be used for the currently inactive CorrelationPlot."""
293
        labels = [str(x + 1) for x in range(len(self.func_units))]
×
294
        return pd.DataFrame(data=self.lca_scores_normalized.T, columns=labels)
×
295

296
    def lca_scores_to_dataframe(self) -> pd.DataFrame:
1✔
297
        """Returns a dataframe of LCA scores using FU labels as index and
298
        methods as columns.
299
        """
300
        return pd.DataFrame(
×
301
            data=self.lca_scores,
302
            index=pd.Index(self.fu_activity_keys),
303
            columns=pd.Index(self.methods),
304
        )
305

306
    def get_all_metadata(self) -> None:
1✔
307
        """Populate AB_metadata with relevant database values.
308

309
        Set metadata in form of a Pandas DataFrame for biosphere and
310
        technosphere databases for tables and additional aggregation.
311
        """
312
        AB_metadata.add_metadata(self.all_databases)
×
313

314

315
class Contributions(object):
1✔
316
    """Contribution Analysis built on top of the Multi-LCA class.
317

318
    This class requires instantiated MLCA and MetaDataStore objects.
319

320
    Parameters
321
    ----------
322
    mlca : `MLCA`
323
        An instantiated MLCA object
324

325
    Attributes
326
    ----------
327
    DEFAULT_ACT_FIELDS : list
328
        Default activity/reference flow column names
329
    DEFAULT_EF_FIELDS : list
330
        Default environmental flow column names
331
    mlca: `MLCA`
332
        Linked `MLCA` instance used for contribution calculations
333
    act_fields: list
334
        technosphere-specific metadata column names
335
    ef_fields: list
336
        biosphere-specific metadata column names
337

338
    Raises
339
    ------
340
    ValueError
341
        If the given `mlca` object is not an instance of `MLCA`
342

343
    """
344

345
    ACT = "process"
1✔
346
    EF = "elementary_flow"
1✔
347
    TECH = "technosphere"
1✔
348
    BIOS = "biosphere"
1✔
349

350
    DEFAULT_ACT_FIELDS = ["reference product", "name", "location", "unit", "database"]
1✔
351
    DEFAULT_EF_FIELDS = ["name", "categories", "type", "unit", "database"]
1✔
352

353
    DEFAULT_ACT_AGGREGATES = ["none"] + DEFAULT_ACT_FIELDS
1✔
354
    DEFAULT_EF_AGGREGATES = ["none"] + DEFAULT_EF_FIELDS
1✔
355

356
    def __init__(self, mlca):
1✔
357
        if not isinstance(mlca, MLCA):
×
358
            raise ValueError("Must pass an MLCA object. Passed:", type(mlca))
×
359
        self.mlca = mlca
×
360
        # Ensure MetaDataStore is updated.
361
        self.mlca.get_all_metadata()
×
362

363
        # Set default metadata keys (those not in the dataframe will be eliminated)
364
        self.act_fields = AB_metadata.get_existing_fields(self.DEFAULT_ACT_FIELDS)
×
365
        self.ef_fields = AB_metadata.get_existing_fields(self.DEFAULT_EF_FIELDS)
×
366

367
        # Specific datastructures for retrieving relevant MLCA data
368
        # inventory: inventory, reverse index, metadata keys, metadata fields
369
        self.inventory_data = {
×
370
            "biosphere": (
371
                self.mlca.inventory,
372
                self.mlca.rev_biosphere_dict,
373
                self.mlca.fu_activity_keys,
374
                self.ef_fields,
375
            ),
376
            "technosphere": (
377
                self.mlca.technosphere_flows,
378
                self.mlca.rev_activity_dict,
379
                self.mlca.fu_activity_keys,
380
                self.act_fields,
381
            ),
382
        }
383
        # aggregation: reverse index, metadata keys, metadata fields
384
        self.aggregate_data = {
×
385
            "biosphere": (
386
                self.mlca.rev_biosphere_dict,
387
                self.mlca.lca.biosphere_dict,
388
                self.ef_fields,
389
            ),
390
            "technosphere": (
391
                self.mlca.rev_activity_dict,
392
                self.mlca.lca.activity_dict,
393
                self.act_fields,
394
            ),
395
        }
396

397
    def normalize(self, contribution_array: np.ndarray, total_range:bool=True) -> np.ndarray:
1✔
398
        """Normalize the contribution array based on range or score
399

400
        Parameters
401
        ----------
402
        contribution_array : A 2-dimensional contribution array
403
        total_range : A bool, True for normalization based on range, False for score
404

405
        Returns
406
        -------
407
        2-dimensional array of same shape, with scores normalized.
408

409
        """
NEW
410
        if total_range:  # total is based on the range
×
NEW
411
            total = abs(abs(contribution_array).sum(axis=1, keepdims=True))
×
412
        else:  # total is based on the score
NEW
413
            total = abs(contribution_array.sum(axis=1, keepdims=True))
×
NEW
414
        return contribution_array / total
×
415

416
    def _build_dict(
1✔
417
        self,
418
        contributions: np.ndarray,
419
        FU_M_index: dict,
420
        rev_dict: dict,
421
        limit: int,
422
        limit_type: str,
423
        total_range: bool,
424
    ) -> dict:
425
        """Sort the given contribution array on method or reference flow column.
426

427
        Parameters
428
        ----------
429
        contributions: A 2-dimensional contribution array
430
        FU_M_index : Dictionary which maps the reference flows or methods to their matching columns
431
        rev_dict : 'reverse' dictionary used to map correct activity/method to its value
432
        limit : Number of top-contributing items to include
433
        limit_type : Either "number" or "percent", ContributionAnalysis.sort_array for complete explanation
434

435
        Returns
436
        -------
437
        Top-contributing flows per method or activity
438

439
        """
440
        topcontribution_dict = dict()
×
441
        for fu_or_method, col in FU_M_index.items():
×
442
            contribution_col = contributions[col, :]
×
443
            if total_range:  # total is based on the range
×
NEW
444
                normalize_to = np.abs(contribution_col).sum()
×
445
            else:  # total is based on the score
NEW
446
                normalize_to = contribution_col.sum()
×
NEW
447
            score = contribution_col.sum()
×
448

449
            top_contribution = ca.sort_array(
×
450
                contribution_col, limit=limit, limit_type=limit_type, total=normalize_to
451
            )
452

453
            # split and calculate remaining rest sections for positive and negative part
454
            pos_rest = (
×
455
                np.sum(contribution_col[contribution_col > 0])
456
                - np.sum(top_contribution[top_contribution[:, 0] > 0][:, 0])
457
            )
458
            neg_rest = (
×
459
                    np.sum(contribution_col[contribution_col < 0])
460
                    - np.sum(top_contribution[top_contribution[:, 0] < 0][:, 0])
461
            )
462

463
            cont_per = OrderedDict()
×
464
            cont_per.update(
×
465
                {
466
                    ("Score", ""): score,
467
                    ("Rest (+)", ""): pos_rest,
468
                    ("Rest (-)", ""): neg_rest,
469
                }
470
            )
471
            for value, index in top_contribution:
×
472
                cont_per.update({rev_dict[index]: value})
×
473
            topcontribution_dict.update({fu_or_method: cont_per})
×
474
        return topcontribution_dict
×
475

476
    @staticmethod
1✔
477
    def get_labels(
1✔
478
        key_list: pd.MultiIndex,
479
        fields: Optional[list] = None,
480
        separator: str = " | ",
481
        max_length: int = False,
482
        mask: Optional[list] = None,
483
    ) -> list:
484
        """Generate labels from metadata information.
485

486
        Setting max_length will wrap the label into a multi-line string if
487
        size is larger than max_length.
488

489
        Parameters
490
        ----------
491
        key_list : An index containing 'keys' to be retrieved from the MetaDataStore
492
        fields : List of column-names to be included from the MetaDataStore
493
        separator : Specific separator to use when joining strings together
494
        max_length : Allowed character length before string is wrapped over multiple lines
495
        mask : Instead of the metadata, this list is used to check keys against.
496
            Use if data is aggregated or keys do not exist in MetaDataStore
497

498
        Returns
499
        -------
500
        Translated and/or joined (and wrapped) labels matching the keys
501

502
        """
503
        fields = (
×
504
            fields if fields else ["name", "reference product", "location", "database"]
505
        )
506
        keys = (
×
507
            k for k in key_list
508
        )  # need to do this as the keys come from a pd.Multiindex
509
        translated_keys = []
×
510
        for k in keys:
×
511
            if mask and k in mask:
×
512
                translated_keys.append(k)
×
513
            elif isinstance(k, str):
×
514
                translated_keys.append(k)
×
515
            elif k in AB_metadata.index:
×
516
                translated_keys.append(
×
517
                    separator.join(
518
                        [str(l) for l in list(AB_metadata.get_metadata(k, fields))]
519
                    )
520
                )
521
            else:
522
                translated_keys.append(separator.join([i for i in k if i != ""]))
×
523
        if max_length:
×
524
            translated_keys = [
×
525
                wrap_text(k, max_length=max_length) for k in translated_keys
526
            ]
527
        return translated_keys
×
528

529
    @classmethod
1✔
530
    def join_df_with_metadata(
1✔
531
        cls,
532
        df: pd.DataFrame,
533
        x_fields: Optional[list] = None,
534
        y_fields: Optional[list] = None,
535
        special_keys: Optional[list] = None,
536
    ) -> pd.DataFrame:
537
        """Join a dataframe that has keys on the index with metadata.
538

539
        Metadata fields are defined in x_fields.
540
        If columns are also keys (and not, e.g. method names), they can also
541
        be replaced with metadata, if y_fields are provided.
542

543
        Parameters
544
        ----------
545
        df : Simple DataFrame containing processed data
546
        x_fields : List of additional columns to add from the MetaDataStore
547
        y_fields : List of column keys for the data in the df dataframe
548
        special_keys : List of specific items to place at the top of the dataframe
549

550
        Returns
551
        -------
552
        Expanded and metadata-annotated dataframe
553

554
        """
555

556
        # replace column keys with labels
557
        df.columns = cls.get_labels(df.columns, fields=y_fields)
×
558
        # Coerce index to MultiIndex if it currently isn't
559
        if not isinstance(df.index, pd.MultiIndex):
×
560
            df.index = pd.MultiIndex.from_tuples(ids_to_keys(df.index))
×
561

562
        # get metadata for rows
563
        keys = [k for k in df.index if k in AB_metadata.index]
×
564
        metadata = AB_metadata.get_metadata(keys, x_fields)
×
565

566
        # join data with metadata
567
        joined = metadata.join(df, how="outer")
×
568

569
        if special_keys:
×
570
            # replace index keys with labels
571
            try:  # first put Total, Rest (+) and Rest (-) to the first three positions in the dataframe
×
572
                complete_index = special_keys + keys
×
573
                joined = joined.reindex(complete_index, axis="index", fill_value=0.0)
×
574
            except:
×
575
                log.error(
×
576
                    "Could not put 'Total', 'Rest (+)' and 'Rest (-)' on positions 0, 1 and 2 in the dataframe."
577
                )
578
        joined.index = cls.get_labels(joined.index, fields=x_fields)
×
579
        return joined
×
580

581
    def get_labelled_contribution_dict(
1✔
582
        self,
583
        cont_dict: dict,
584
        x_fields: list = None,
585
        y_fields: list = None,
586
        mask: list = None,
587
    ) -> pd.DataFrame:
588
        """Annotate the contribution dict with metadata.
589

590
        Parameters
591
        ----------
592
        cont_dict : Holds the contribution data connected to the functions of methods
593
        x_fields : X-axis fieldnames, these are usually the indexes/keys of specific processes
594
        y_fields : Column names specific to the cont_dict to be labelled
595
        mask : Used in case of aggregation or special cases where the usual way of using the metadata cannot be used
596

597
        Returns
598
        -------
599
        Annotated contribution dict inside a pandas dataframe
600

601
        """
602
        dfs = (
×
603
            pd.DataFrame(v.values(), index=list(v.keys()), columns=[k])
604
            for k, v in cont_dict.items()
605
        )
606
        df = pd.concat(dfs, sort=False, axis=1)
×
607
        # If the cont_dict has tuples for keys, coerce df.columns into MultiIndex
608
        if all(isinstance(k, tuple) for k in cont_dict.keys()):
×
609
            df.columns = pd.MultiIndex.from_tuples(df.columns)
×
610

NEW
611
        special_keys = [("Score", ""), ("Rest (+)", ""), ("Rest (-)", "")]
×
612
        # replace all 0 values with NaN and drop all rows with only NaNs
613
        df = df.replace(0, np.nan)
×
614

615
        # sort on absolute mean of a row
NEW
616
        df_bot = deepcopy(df.iloc[3:, :])
×
617

618
        func = lambda row: np.nanmean(np.abs(row))
×
619
        if len(df_bot) > 1:  # but only sort if there is something to sort
×
620
            df_bot["_sort_me_"] = (df_bot.select_dtypes(include=np.number)).apply(func, axis=1)
×
621
            df_bot.sort_values(by="_sort_me_", ascending=False, inplace=True)
×
622
            del df_bot["_sort_me_"]
×
623

624
        df = pd.concat([df.iloc[:3, :], df_bot], axis=0)
×
NEW
625
        df.dropna(how="all", inplace=True)
×
626

627
        if not mask:
×
628
            joined = self.join_df_with_metadata(
×
629
                df, x_fields=x_fields, y_fields=y_fields, special_keys=special_keys
630
            )
631
        else:
632
            df.columns = self.get_labels(df.columns, fields=y_fields)
×
633
            keys = [k for k in df.index if k in mask]
×
634
            combined_keys = special_keys + keys
×
635
            # Reindex the combined_keys to ensure they always exist in the dataframe,
636
            # this avoids keys with 0 values not existing due to the 'dropna' action above.
637
            df = df.reindex(combined_keys, axis="index", fill_value=0.0)
×
638
            df.index = self.get_labels(df.index, mask=mask)
×
639
            joined = df
×
640
        if joined is not None:
×
641
            return joined.reset_index(drop=False)
×
642

643
    @staticmethod
1✔
644
    def adjust_table_unit(df: pd.DataFrame, method: Optional[tuple]) -> pd.DataFrame:
1✔
645
        """Given a dataframe, adjust the unit of the table to either match the given method, or not exist."""
646
        if "unit" not in df.columns:
×
647
            return df
×
NEW
648
        keys = df.index[~df["index"].isin({"Score", "Rest (+)", "Rest (-)"})]
×
649
        unit = bd.Method(method).metadata.get("unit") if method else "unit"
×
650
        df.loc[keys, "unit"] = unit
×
651
        return df
×
652

653
    @staticmethod
1✔
654
    def _build_inventory(
1✔
655
        inventory: dict, indices: dict, columns: list, fields: list
656
    ) -> pd.DataFrame:
657
        df = pd.DataFrame(inventory)
×
658
        df.index = pd.MultiIndex.from_tuples(ids_to_keys(indices.values()))
×
659
        df.columns = Contributions.get_labels(columns, max_length=30)
×
660
        metadata = AB_metadata.get_metadata(list(ids_to_keys(indices.values())), fields)
×
661
        joined = metadata.join(df)
×
662
        joined.reset_index(inplace=True, drop=True)
×
663
        return joined
×
664

665
    def inventory_df(
1✔
666
        self, inventory_type: str, columns: set = {"name", "database", "code"}
667
    ) -> pd.DataFrame:
668
        """Return an inventory dataframe with metadata of the given type."""
669
        try:
×
670
            data = self.inventory_data[inventory_type]
×
671
            appending = columns.difference(set(data[3]))
×
672
            for clmn in appending:
×
673
                data[3].append(clmn)
×
674
        except KeyError:
×
675
            raise ValueError(
×
676
                "Type must be either 'biosphere' or 'technosphere', "
677
                "'{}' given.".format(inventory_type)
678
            )
679
        return self._build_inventory(*data)
×
680

681
    def _build_lca_scores_df(self, scores: np.ndarray) -> pd.DataFrame:
1✔
682
        df = pd.DataFrame(
×
683
            scores,
684
            index=pd.MultiIndex.from_tuples(self.mlca.fu_activity_keys),
685
            columns=self.mlca.methods,
686
        )
687
        # Add amounts column.
688
        df["amount"] = [next(iter(fu.values()), 1.0) for fu in self.mlca.func_units]
×
689
        joined = Contributions.join_df_with_metadata(
×
690
            df, x_fields=self.act_fields, y_fields=None
691
        )
692
        # Precisely order the columns that are shown in the LCA Results overview
693
        # tab: “X kg of product Y from activity Z in location L, and database D”
694
        col_order = pd.Index(
×
695
            [
696
                "amount",
697
                "unit",
698
                "reference product",
699
                "name",
700
                "location",
701
                "database",
702
            ]
703
        )
704
        methods = joined.columns.difference(col_order, sort=False)
×
705
        joined = joined.loc[:, col_order.append(methods)]
×
706
        return joined.reset_index(drop=False)
×
707

708
    def lca_scores_df(self, normalized: bool = False) -> pd.DataFrame:
1✔
709
        """Return a metadata-annotated DataFrame of the LCA scores."""
710
        scores = (
×
711
            self.mlca.lca_scores if not normalized else self.mlca.lca_scores_normalized
712
        )
713
        return self._build_lca_scores_df(scores)
×
714

715
    @staticmethod
1✔
716
    def _build_contributions(data: np.ndarray, index: int, axis: int) -> np.ndarray:
1✔
717
        return data.take(index, axis=axis)
×
718

719
    def get_contributions(
1✔
720
        self, contribution, functional_unit=None, method=None, **kwargs
721
    ) -> np.ndarray:
722
        """Return a contribution matrix given the type and fu / method."""
723
        if all([functional_unit, method]) or not any([functional_unit, method]):
×
724
            raise ValueError(
×
725
                "It must be either by reference flow or by impact category. Provided:"
726
                "\n Reference flow: {} \n Impact Category: {}".format(
727
                    functional_unit, method
728
                )
729
            )
730
        dataset = {
×
731
            "process": self.mlca.process_contributions,
732
            "elementary_flow": self.mlca.elementary_flow_contributions,
733
        }
734
        if method:
×
735
            return self._build_contributions(
×
736
                dataset[contribution], self.mlca.method_index[method], 1
737
            )
738
        elif functional_unit:
×
739
            return self._build_contributions(
×
740
                dataset[contribution], self.mlca.func_key_dict[functional_unit], 0
741
            )
742

743
    def aggregate_by_parameters(
1✔
744
        self,
745
        contributions: np.ndarray,
746
        inventory: str,
747
        parameters: Union[str, list] = None,
748
    ):
749
        """Perform aggregation of the contribution data given parameters.
750

751
        Parameters
752
        ----------
753
        contributions : 2-dimensional contribution array
754
        inventory : Either 'biosphere' or 'technosphere', used to determine which inventory to use
755
        parameters : One or more parameters by which to aggregate the given contribution array.
756

757
        Returns
758
        -------
759
        aggregated : pd.DataFrame
760
            The aggregated 2-dimensional contribution array
761
        mask_index : dict
762
            Contains all of the values of the aggregation mask, linked to their indexes
763
        mask : list or dictview or None
764
            An optional list or dictview of the mask_index values
765

766
        """
767
        rev_index, keys, fields = self.aggregate_data[inventory]
×
768
        if not parameters:
×
769
            return contributions, rev_index, None
×
770

771
        df = pd.DataFrame(contributions).T
×
772
        columns = list(range(contributions.shape[0]))
×
773
        df.index = pd.MultiIndex.from_tuples(rev_index.values())
×
774
        metadata = AB_metadata.get_metadata(list(keys), fields)
×
775

776
        joined = metadata.join(df)
×
777
        joined.reset_index(inplace=True, drop=True)
×
778
        grouped = joined.groupby(parameters)
×
779
        aggregated = grouped[columns].sum()
×
780
        mask_index = {i: m for i, m in enumerate(aggregated.index)}
×
781

782
        return aggregated.T.values, mask_index, mask_index.values()
×
783

784
    def _contribution_rows(self, contribution: str, aggregator=None):
1✔
785
        if aggregator is None:
×
786
            return self.act_fields if contribution == self.ACT else self.ef_fields
×
787
        return aggregator if isinstance(aggregator, list) else [aggregator]
×
788

789
    def _correct_method_index(self, mthd_indx: list) -> dict:
1✔
790
        """A method for amending the tuples for impact method labels so
791
        that all tuples are fully printed.
792

793
        NOTE THE AMENDED TUPLES ARE COPIED, THIS SHOULD NOT BE USED TO
794
        ASSIGN OR MODIFY THE UNDERLYING DATA STRUCTURES!
795

796
        mthd_indx: a list of tuples for the impact method names
797
        """
798
        method_tuple_length = max([len(k) for k in mthd_indx])
×
799
        conv_dict = dict()
×
800
        for v, mthd in enumerate(mthd_indx):
×
801
            if len(mthd) < method_tuple_length:
×
802
                _l = list(mthd)
×
803
                for i in range(len(mthd), method_tuple_length):
×
804
                    _l.append("")
×
805
                mthd = tuple(_l)
×
806
            conv_dict[mthd] = v
×
807
        return conv_dict
×
808

809
    def _contribution_index_cols(self, **kwargs) -> (dict, Optional[Iterable]):
1✔
810
        if kwargs.get("method") is not None:
×
811
            return self.mlca.fu_index, self.act_fields
×
812
        return self._correct_method_index(self.mlca.methods), None
×
813

814
    def top_elementary_flow_contributions(
1✔
815
        self,
816
        functional_unit: Optional[tuple] = None,
817
        method: Optional[tuple] = None,
818
        aggregator: Union[str, list, None] = None,
819
        limit: int = 5,
820
        normalize: bool = False,
821
        limit_type: str = "number",
822
        total_range: bool = True,
823
        **kwargs,
824
    ) -> pd.DataFrame:
825
        """Return top EF contributions for either functional_unit or method.
826

827
        * If functional_unit: Compare the unit against all considered impact
828
        assessment methods.
829
        * If method: Compare the method against all involved processes.
830

831
        Parameters
832
        ----------
833
        functional_unit : The reference flow to compare all considered impact categories against
834
        method : The method to compare all considered reference flows against
835
        aggregator : Used to aggregate EF contributions over certain columns
836
        limit : The number of top contributions to consider
837
        normalize : Determines whether or not to normalize the contribution values
838
        limit_type : The type of limit, either 'number' or 'percent'
839
        total_range : Whether to consider the total for contributions the range (True) or the score (False)
840

841
        Returns
842
        -------
843
        Annotated top-contribution dataframe
844

845
        """
846
        contributions = self.get_contributions(
×
847
            self.EF, functional_unit, method, **kwargs
848
        )
849

850
        x_fields = self._contribution_rows(self.EF, aggregator)
×
851
        index, y_fields = self._contribution_index_cols(
×
852
            functional_unit=functional_unit, method=method
853
        )
854
        contributions, rev_index, mask = self.aggregate_by_parameters(
×
855
            contributions, self.BIOS, aggregator
856
        )
857

858
        # Normalise if required
859
        if normalize:
×
NEW
860
            contributions = self.normalize(contributions, total_range)
×
861

862
        top_cont_dict = self._build_dict(
×
863
            contributions, index, rev_index, limit, limit_type, total_range
864
        )
865
        labelled_df = self.get_labelled_contribution_dict(
×
866
            top_cont_dict, x_fields=x_fields, y_fields=y_fields, mask=mask
867
        )
868
        self.adjust_table_unit(labelled_df, method)
×
869
        return labelled_df
×
870

871
    def top_process_contributions(
1✔
872
        self,
873
        functional_unit: Optional[tuple] = None,
874
        method: Optional[tuple] = None,
875
        aggregator: Union[str, list, None] = None,
876
        limit: int = 5,
877
        normalize: bool = False,
878
        limit_type: str = "number",
879
        total_range: bool = True,
880
        **kwargs,
881
    ) -> pd.DataFrame:
882
        """Return top process contributions for functional_unit or method.
883

884
        * If functional_unit: Compare the process against all considered impact
885
        assessment methods.
886
        * If method: Compare the method against all involved processes.
887

888
        Parameters
889
        ----------
890
        functional_unit : The reference flow to compare all considered impact categories against
891
        method : The method to compare all considered reference flows against
892
        aggregator : Used to aggregate EF contributions over certain columns
893
        limit : The number of top contributions to consider
894
        normalize : Determines whether or not to normalize the contribution values
895
        limit_type : The type of limit, either 'number' or 'percent'
896

897
        Returns
898
        -------
899
        Annotated top-contribution dataframe
900

901
        """
902
        contributions = self.get_contributions(
×
903
            self.ACT, functional_unit, method, **kwargs
904
        )
905

906
        x_fields = self._contribution_rows(self.ACT, aggregator)
×
907
        index, y_fields = self._contribution_index_cols(
×
908
            functional_unit=functional_unit, method=method
909
        )
910
        contributions, rev_index, mask = self.aggregate_by_parameters(
×
911
            contributions, self.TECH, aggregator
912
        )
913

914
        # Normalise if required
915
        if normalize:
×
NEW
916
            contributions = self.normalize(contributions, total_range)
×
917

918
        top_cont_dict = self._build_dict(
×
919
            contributions, index, rev_index, limit, limit_type, total_range
920
        )
921
        labelled_df = self.get_labelled_contribution_dict(
×
922
            top_cont_dict, x_fields=x_fields, y_fields=y_fields, mask=mask
923
        )
924
        self.adjust_table_unit(labelled_df, method)
×
925
        return labelled_df
×
926

927

928
def ids_to_keys(index_list):
1✔
929
    return [bd.get_activity(i).key if isinstance(i, int) else i for i in index_list]
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc