• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

LCA-ActivityBrowser / activity-browser / 12764988589

14 Jan 2025 09:43AM UTC coverage: 53.273% (-0.04%) from 53.311%
12764988589

Pull #1430

github

web-flow
Merge 989311b57 into 1e0231426
Pull Request #1430: fix wrong logic for `range` CA total

1 of 7 new or added lines in 1 file covered. (14.29%)

5 existing lines in 1 file now uncovered.

8367 of 15706 relevant lines covered (53.27%)

0.53 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

20.71
/activity_browser/bwutils/multilca.py
1
from collections import OrderedDict
1✔
2
from copy import deepcopy
1✔
3
from typing import Iterable, Optional, Union
1✔
4
from logging import getLogger
1✔
5

6
import bw2analyzer as ba
1✔
7
import bw2calc as bc
1✔
8
import numpy as np
1✔
9
import pandas as pd
1✔
10
from PySide2.QtWidgets import QApplication, QMessageBox
1✔
11

12
from activity_browser.mod import bw2data as bd
1✔
13

14
from .commontasks import wrap_text
1✔
15
from .errors import ReferenceFlowValueError
1✔
16
from .metadata import AB_metadata
1✔
17

18
log = getLogger(__name__)
1✔
19
ca = ba.ContributionAnalysis()
1✔
20

21

22
class MLCA(object):
1✔
23
    """Wrapper class for performing LCA calculations with many reference flows and impact categories.
24

25
    Needs to be passed a brightway ``calculation_setup`` name.
26

27
    This class does not subclass the `LCA` class, and performs all
28
    calculations upon instantiation.
29

30
    Initialization creates `self.lca_scores`, which is a NumPy array
31
    of LCA scores, with rows of reference flows and columns of impact categories.
32
    Ordering is the same as in the `calculation_setup`.
33

34
    This class is adapted from `bw2calc.multi_lca.MultiLCA` and includes a
35
    number of additional attributes required to perform process- and
36
    elementary flow contribution analysis (see class `Contributions` below).
37

38
    Parameters
39
    ----------
40
    cs_name : str
41
        Name of the calculation setup
42

43
    Attributes
44
    ----------
45
    func_units_dict
46
    all_databases
47
    lca_scores_normalized
48
    func_units: list
49
        List of dictionaries, each containing the reference flow key and
50
        its required output
51
    fu_activity_keys: list
52
        The reference flow keys
53
    fu_index: dict
54
        Links the reference flows to a specific index
55
    rev_fu_index: dict
56
        Same as `fu_index` but using the indexes as keys
57
    methods: list
58
        The impact categories of the calculation setup
59
    method_index: dict
60
        Links the impact categories to a specific index
61
    rev_method_index: dict
62
        Same as `method_index` but using the indexes as keys
63
    lca: `bw2calc.lca.LCA`
64
        Brightway LCA instance used to perform LCA, LCI and LCIA
65
        calculations
66
    method_matrices: list
67
        Contains the characterization matrix for each impact category.
68
    lca_scores: `numpy.ndarray`
69
        2-dimensional array of shape (`func_units`, `methods`) holding the
70
        calculated LCA scores of each combination of reference flow and
71
        impact assessment method
72
    rev_activity_dict: dict
73
        See `bw2calc.lca.LCA.reverse_dict`
74
    rev_product_dict: dict
75
        See `bw2calc.lca.LCA.reverse_dict`
76
    rev_biosphere_dict: dict
77
        See `bw2calc.lca.LCA.reverse_dict`
78
    scaling_factors: dict
79
        Contains the life-cycle inventory scaling factors per reference flow
80
    technosphere_flows: dict
81
        Contains the calculated technosphere flows per reference flow
82
    inventory: dict
83
        Life cycle inventory (biosphere flows) per reference flow
84
    inventories: dict
85
        Biosphere flows per reference flow and impact category combination
86
    characterized_inventories: dict
87
        Inventory multiplied by scaling (relative impact on environment) per
88
        reference flow and impact category combination
89
    elementary_flow_contributions: `numpy.ndarray`
90
        3-dimensional array of shape (`func_units`, `methods`, `biosphere`)
91
        which holds the characterized inventory results summed along the
92
        technosphere axis
93
    process_contributions: `numpy.ndarray`
94
        3-dimensional array of shape (`func_units`, `methods`, `technosphere`)
95
        which holds the characterized inventory results summed along the
96
        biosphere axis
97
    func_unit_translation_dict: dict
98
        Contains the reference flow key and its expected output linked to
99
        the brightway activity label.
100
    func_key_dict: dict
101
        An index of the brightway activity labels
102
    func_key_list: list
103
        A derivative of `func_key_dict` containing just the keys
104

105
    Raises
106
    ------
107
    ValueError
108
        If the given `cs_name` cannot be found in brightway calculation_setups
109

110
    """
111

112
    def __init__(self, cs_name: str):
1✔
113
        try:
×
114
            cs = bd.calculation_setups[cs_name]
×
115
        except KeyError:
×
116
            raise ValueError(f"{cs_name} is not a known `calculation_setup`.")
×
117

118
        # check if all values are non-zero
119
        # cs['inv'] contains all reference flows (rf),
120
        # all values of rf are the individual reference flow items.
121
        if [v for rf in cs["inv"] for v in rf.values() if v == 0]:
×
122
            msg = QMessageBox()
×
123
            msg.setWindowTitle("Reference flows equal 0")
×
124
            msg.setText("All reference flows must be non-zero.")
×
125
            msg.setInformativeText(
×
126
                "Please enter a valid value before calculating LCA results again."
127
            )
128
            msg.setIcon(QMessageBox.Warning)
×
129
            QApplication.restoreOverrideCursor()
×
130
            msg.exec_()
×
131
            raise ReferenceFlowValueError("Reference flow == 0")
×
132

133
        # reference flows and related indexes
134
        self.func_units = cs["inv"]
×
135
        self.fu_activity_keys = [list(fu.keys())[0] for fu in self.func_units]
×
136
        self.fu_index = {k: i for i, k in enumerate(self.fu_activity_keys)}
×
137
        self.rev_fu_index = {v: k for k, v in self.fu_index.items()}
×
138

139
        # Methods and related indexes
140
        self.methods = cs["ia"]
×
141
        self.method_index = {m: i for i, m in enumerate(self.methods)}
×
142
        self.rev_method_index = {v: k for k, v in self.method_index.items()}
×
143

144
        # initial LCA and prepare method matrices
145
        self.lca = self._construct_lca()
×
146
        self.lca.lci(factorize=True)
×
147
        self.method_matrices = []
×
148
        for method in self.methods:
×
149
            self.lca.switch_method(method)
×
150
            self.method_matrices.append(self.lca.characterization_matrix)
×
151

152
        self.lca_scores = np.zeros((len(self.func_units), len(self.methods)))
×
153

154
        # data to be stored
155
        (self.rev_activity_dict, self.rev_product_dict, self.rev_biosphere_dict) = (
×
156
            self.lca.reverse_dict()
157
        )
158

159
        # Scaling
160
        self.scaling_factors = dict()
×
161

162
        # Technosphere product flows for a given reference flow
163
        self.technosphere_flows = dict()
×
164
        # Life cycle inventory (biosphere flows) by reference flow
165
        self.inventory = dict()
×
166
        # Inventory (biosphere flows) for specific reference flow (e.g. 2000x15000) and impact category.
167
        self.inventories = dict()
×
168
        # Inventory multiplied by scaling (relative impact on environment) per impact category.
169
        self.characterized_inventories = dict()
×
170

171
        # Summarized contributions for EF and processes.
172
        self.elementary_flow_contributions = np.zeros(
×
173
            (
174
                len(self.func_units),
175
                len(self.methods),
176
                self.lca.biosphere_matrix.shape[0],
177
            )
178
        )
179
        self.process_contributions = np.zeros(
×
180
            (
181
                len(self.func_units),
182
                len(self.methods),
183
                self.lca.technosphere_matrix.shape[0],
184
            )
185
        )
186

187
        self.func_unit_translation_dict = {}
×
188
        for fu in self.func_units:
×
189
            key = next(iter(fu))
×
190
            amount = fu[key]
×
191
            act = bd.get_activity(key)
×
192
            self.func_unit_translation_dict[
×
193
                (
194
                    f'{act["name"]} | '
195
                    f'{act["reference product"]} | '
196
                    f'{act["location"]} | '
197
                    f'{act["database"]} | '
198
                    f"{amount}"
199
                )
200
            ] = fu
201
        self.func_key_dict = {
×
202
            m: i for i, m in enumerate(self.func_unit_translation_dict.keys())
203
        }
204
        self.func_key_list = list(self.func_unit_translation_dict.keys())
×
205

206
    def _construct_lca(self):
1✔
207
        return bc.LCA(demand=self.func_units_dict, method=self.methods[0])
×
208

209
    def _perform_calculations(self):
1✔
210
        """Isolates the code which performs calculations to allow subclasses
211
        to either alter the code or redo calculations after matrix substitution.
212
        """
213
        for row, func_unit in enumerate(self.func_units):
×
214
            # Do the LCA for the current reference flow
215
            try:
×
216
                self.lca.redo_lci(func_unit)
×
217
            except:
×
218
                # bw25 compatibility
219
                key = list(func_unit.keys())[0]
×
220
                self.lca.redo_lci({bd.get_activity(key).id: func_unit[key]})
×
221

222
            # Now update the:
223
            # - Scaling factors
224
            # - Technosphere flows
225
            # - Life cycle inventory
226
            # - Life-cycle inventory (disaggregated by contributing process)
227
            # for current reference flow
228
            self.scaling_factors.update({str(func_unit): self.lca.supply_array})
×
229
            self.technosphere_flows.update(
×
230
                {
231
                    str(func_unit): np.multiply(
232
                        self.lca.supply_array, self.lca.technosphere_matrix.diagonal()
233
                    )
234
                }
235
            )
236
            self.inventory.update(
×
237
                {str(func_unit): np.array(self.lca.inventory.sum(axis=1)).ravel()}
238
            )
239
            self.inventories.update({str(func_unit): self.lca.inventory})
×
240

241
            # Now, for each method, take the current reference flow and do inventory analysis
242
            for col, cf_matrix in enumerate(self.method_matrices):
×
243
                self.lca.characterization_matrix = cf_matrix
×
244
                self.lca.lcia_calculation()
×
245
                self.lca_scores[row, col] = self.lca.score
×
246
                self.characterized_inventories[row, col] = (
×
247
                    self.lca.characterized_inventory.copy()
248
                )
249
                self.elementary_flow_contributions[row, col] = np.array(
×
250
                    self.lca.characterized_inventory.sum(axis=1)
251
                ).ravel()
252
                self.process_contributions[row, col] = (
×
253
                    self.lca.characterized_inventory.sum(axis=0)
254
                )
255

256
    def calculate(self):
1✔
257
        self._perform_calculations()
×
258

259
    @property
1✔
260
    def func_units_dict(self) -> dict:
1✔
261
        """Return a dictionary of reference flow (key, demand)."""
262
        return {key: 1 for func_unit in self.func_units for key in func_unit}
×
263

264
    @property
1✔
265
    def all_databases(self) -> set:
1✔
266
        """Get all databases linked to the reference flows."""
267

268
        def get_dependents(dbs: set, dependents: list) -> set:
×
269
            for dep in (bd.databases[db].get("depends", []) for db in dependents):
×
270
                if not dbs.issuperset(dep):
×
271
                    dbs = get_dependents(dbs.union(dep), dep)
×
272
            return dbs
×
273

274
        dbs = set(f[0] for f in self.fu_activity_keys)
×
275
        dbs = get_dependents(dbs, list(dbs))
×
276
        # In rare cases, the default biosphere is not found as a dependency, see:
277
        # https://github.com/LCA-ActivityBrowser/activity-browser/issues/298
278
        # Always include it.
279
        dbs.add(bd.config.biosphere)
×
280
        return dbs
×
281

282
    def get_results_for_method(self, index: int = 0) -> pd.DataFrame:
1✔
283
        data = self.lca_scores[:, index]
×
284
        return pd.DataFrame(data, index=self.fu_activity_keys)
×
285

286
    @property
1✔
287
    def lca_scores_normalized(self) -> np.ndarray:
1✔
288
        """Normalize LCA scores by impact assessment method."""
289
        return self.lca_scores / self.lca_scores.max(axis=0)
×
290

291
    def get_normalized_scores_df(self) -> pd.DataFrame:
1✔
292
        """To be used for the currently inactive CorrelationPlot."""
293
        labels = [str(x + 1) for x in range(len(self.func_units))]
×
294
        return pd.DataFrame(data=self.lca_scores_normalized.T, columns=labels)
×
295

296
    def lca_scores_to_dataframe(self) -> pd.DataFrame:
1✔
297
        """Returns a dataframe of LCA scores using FU labels as index and
298
        methods as columns.
299
        """
300
        return pd.DataFrame(
×
301
            data=self.lca_scores,
302
            index=pd.Index(self.fu_activity_keys),
303
            columns=pd.Index(self.methods),
304
        )
305

306
    def get_all_metadata(self) -> None:
1✔
307
        """Populate AB_metadata with relevant database values.
308

309
        Set metadata in form of a Pandas DataFrame for biosphere and
310
        technosphere databases for tables and additional aggregation.
311
        """
312
        AB_metadata.add_metadata(self.all_databases)
×
313

314

315
class Contributions(object):
1✔
316
    """Contribution Analysis built on top of the Multi-LCA class.
317

318
    This class requires instantiated MLCA and MetaDataStore objects.
319

320
    Parameters
321
    ----------
322
    mlca : `MLCA`
323
        An instantiated MLCA object
324

325
    Attributes
326
    ----------
327
    DEFAULT_ACT_FIELDS : list
328
        Default activity/reference flow column names
329
    DEFAULT_EF_FIELDS : list
330
        Default environmental flow column names
331
    mlca: `MLCA`
332
        Linked `MLCA` instance used for contribution calculations
333
    act_fields: list
334
        technosphere-specific metadata column names
335
    ef_fields: list
336
        biosphere-specific metadata column names
337

338
    Raises
339
    ------
340
    ValueError
341
        If the given `mlca` object is not an instance of `MLCA`
342

343
    """
344

345
    ACT = "process"
1✔
346
    EF = "elementary_flow"
1✔
347
    TECH = "technosphere"
1✔
348
    BIOS = "biosphere"
1✔
349

350
    DEFAULT_ACT_FIELDS = ["reference product", "name", "location", "unit", "database"]
1✔
351
    DEFAULT_EF_FIELDS = ["name", "categories", "type", "unit", "database"]
1✔
352

353
    DEFAULT_ACT_AGGREGATES = ["none"] + DEFAULT_ACT_FIELDS
1✔
354
    DEFAULT_EF_AGGREGATES = ["none"] + DEFAULT_EF_FIELDS
1✔
355

356
    def __init__(self, mlca):
1✔
357
        if not isinstance(mlca, MLCA):
×
358
            raise ValueError("Must pass an MLCA object. Passed:", type(mlca))
×
359
        self.mlca = mlca
×
360
        # Ensure MetaDataStore is updated.
361
        self.mlca.get_all_metadata()
×
362

363
        # Set default metadata keys (those not in the dataframe will be eliminated)
364
        self.act_fields = AB_metadata.get_existing_fields(self.DEFAULT_ACT_FIELDS)
×
365
        self.ef_fields = AB_metadata.get_existing_fields(self.DEFAULT_EF_FIELDS)
×
366

367
        # Specific datastructures for retrieving relevant MLCA data
368
        # inventory: inventory, reverse index, metadata keys, metadata fields
369
        self.inventory_data = {
×
370
            "biosphere": (
371
                self.mlca.inventory,
372
                self.mlca.rev_biosphere_dict,
373
                self.mlca.fu_activity_keys,
374
                self.ef_fields,
375
            ),
376
            "technosphere": (
377
                self.mlca.technosphere_flows,
378
                self.mlca.rev_activity_dict,
379
                self.mlca.fu_activity_keys,
380
                self.act_fields,
381
            ),
382
        }
383
        # aggregation: reverse index, metadata keys, metadata fields
384
        self.aggregate_data = {
×
385
            "biosphere": (
386
                self.mlca.rev_biosphere_dict,
387
                self.mlca.lca.biosphere_dict,
388
                self.ef_fields,
389
            ),
390
            "technosphere": (
391
                self.mlca.rev_activity_dict,
392
                self.mlca.lca.activity_dict,
393
                self.act_fields,
394
            ),
395
        }
396

397
    def normalize(self, contribution_array: np.ndarray, total_range:bool=True) -> np.ndarray:
1✔
398
        """Normalize the contribution array based on range or score
399

400
        Parameters
401
        ----------
402
        contribution_array : A 2-dimensional contribution array
403
        total_range : A bool, True for normalization based on range, False for score
404

405
        Returns
406
        -------
407
        2-dimensional array of same shape, with scores normalized.
408

409
        """
NEW
410
        if total_range:  # total is based on the range
×
NEW
411
            total = abs(abs(contribution_array).sum(axis=1, keepdims=True))
×
412
        else:  # total is based on the score
NEW
413
            total = abs(contribution_array.sum(axis=1, keepdims=True))
×
NEW
414
        return contribution_array / total
×
415

416
    def _build_dict(
1✔
417
        self,
418
        contributions: np.ndarray,
419
        FU_M_index: dict,
420
        rev_dict: dict,
421
        limit: int,
422
        limit_type: str,
423
        total_range: bool,
424
    ) -> dict:
425
        """Sort the given contribution array on method or reference flow column.
426

427
        Parameters
428
        ----------
429
        contributions: A 2-dimensional contribution array
430
        FU_M_index : Dictionary which maps the reference flows or methods to their matching columns
431
        rev_dict : 'reverse' dictionary used to map correct activity/method to its value
432
        limit : Number of top-contributing items to include
433
        limit_type : Either "number" or "percent", ContributionAnalysis.sort_array for complete explanation
434

435
        Returns
436
        -------
437
        Top-contributing flows per method or activity
438

439
        """
440
        topcontribution_dict = dict()
×
441
        for fu_or_method, col in FU_M_index.items():
×
442
            contribution_col = contributions[col, :]
×
443
            if total_range:  # total is based on the range
×
444
                total = np.abs(contribution_col).sum()
×
445
            else:  # total is based on the score
446
                total = contribution_col.sum()
×
447

448
            top_contribution = ca.sort_array(
×
449
                contribution_col, limit=limit, limit_type=limit_type, total=total
450
            )
451

452
            # split and calculate remaining rest sections for positive and negative part
453
            pos_rest = (
×
454
                np.sum(contribution_col[contribution_col > 0])
455
                - np.sum(top_contribution[top_contribution[:, 0] > 0][:, 0])
456
            )
457
            neg_rest = (
×
458
                    np.sum(contribution_col[contribution_col < 0])
459
                    - np.sum(top_contribution[top_contribution[:, 0] < 0][:, 0])
460
            )
461

462
            cont_per = OrderedDict()
×
463
            cont_per.update(
×
464
                {
465
                    ("Total", ""): total,
466
                    ("Rest (+)", ""): pos_rest,
467
                    ("Rest (-)", ""): neg_rest,
468
                }
469
            )
470
            for value, index in top_contribution:
×
471
                cont_per.update({rev_dict[index]: value})
×
472
            topcontribution_dict.update({fu_or_method: cont_per})
×
473
        return topcontribution_dict
×
474

475
    @staticmethod
1✔
476
    def get_labels(
1✔
477
        key_list: pd.MultiIndex,
478
        fields: Optional[list] = None,
479
        separator: str = " | ",
480
        max_length: int = False,
481
        mask: Optional[list] = None,
482
    ) -> list:
483
        """Generate labels from metadata information.
484

485
        Setting max_length will wrap the label into a multi-line string if
486
        size is larger than max_length.
487

488
        Parameters
489
        ----------
490
        key_list : An index containing 'keys' to be retrieved from the MetaDataStore
491
        fields : List of column-names to be included from the MetaDataStore
492
        separator : Specific separator to use when joining strings together
493
        max_length : Allowed character length before string is wrapped over multiple lines
494
        mask : Instead of the metadata, this list is used to check keys against.
495
            Use if data is aggregated or keys do not exist in MetaDataStore
496

497
        Returns
498
        -------
499
        Translated and/or joined (and wrapped) labels matching the keys
500

501
        """
502
        fields = (
×
503
            fields if fields else ["name", "reference product", "location", "database"]
504
        )
505
        keys = (
×
506
            k for k in key_list
507
        )  # need to do this as the keys come from a pd.Multiindex
508
        translated_keys = []
×
509
        for k in keys:
×
510
            if mask and k in mask:
×
511
                translated_keys.append(k)
×
512
            elif isinstance(k, str):
×
513
                translated_keys.append(k)
×
514
            elif k in AB_metadata.index:
×
515
                translated_keys.append(
×
516
                    separator.join(
517
                        [str(l) for l in list(AB_metadata.get_metadata(k, fields))]
518
                    )
519
                )
520
            else:
521
                translated_keys.append(separator.join([i for i in k if i != ""]))
×
522
        if max_length:
×
523
            translated_keys = [
×
524
                wrap_text(k, max_length=max_length) for k in translated_keys
525
            ]
526
        return translated_keys
×
527

528
    @classmethod
1✔
529
    def join_df_with_metadata(
1✔
530
        cls,
531
        df: pd.DataFrame,
532
        x_fields: Optional[list] = None,
533
        y_fields: Optional[list] = None,
534
        special_keys: Optional[list] = None,
535
    ) -> pd.DataFrame:
536
        """Join a dataframe that has keys on the index with metadata.
537

538
        Metadata fields are defined in x_fields.
539
        If columns are also keys (and not, e.g. method names), they can also
540
        be replaced with metadata, if y_fields are provided.
541

542
        Parameters
543
        ----------
544
        df : Simple DataFrame containing processed data
545
        x_fields : List of additional columns to add from the MetaDataStore
546
        y_fields : List of column keys for the data in the df dataframe
547
        special_keys : List of specific items to place at the top of the dataframe
548

549
        Returns
550
        -------
551
        Expanded and metadata-annotated dataframe
552

553
        """
554

555
        # replace column keys with labels
556
        df.columns = cls.get_labels(df.columns, fields=y_fields)
×
557
        # Coerce index to MultiIndex if it currently isn't
558
        if not isinstance(df.index, pd.MultiIndex):
×
559
            df.index = pd.MultiIndex.from_tuples(ids_to_keys(df.index))
×
560

561
        # get metadata for rows
562
        keys = [k for k in df.index if k in AB_metadata.index]
×
563
        metadata = AB_metadata.get_metadata(keys, x_fields)
×
564

565
        # join data with metadata
566
        joined = metadata.join(df, how="outer")
×
567

568
        if special_keys:
×
569
            # replace index keys with labels
570
            try:  # first put Total, Rest (+) and Rest (-) to the first three positions in the dataframe
×
571
                complete_index = special_keys + keys
×
572
                joined = joined.reindex(complete_index, axis="index", fill_value=0.0)
×
573
            except:
×
574
                log.error(
×
575
                    "Could not put 'Total', 'Rest (+)' and 'Rest (-)' on positions 0, 1 and 2 in the dataframe."
576
                )
577
        joined.index = cls.get_labels(joined.index, fields=x_fields)
×
578
        return joined
×
579

580
    def get_labelled_contribution_dict(
1✔
581
        self,
582
        cont_dict: dict,
583
        x_fields: list = None,
584
        y_fields: list = None,
585
        mask: list = None,
586
    ) -> pd.DataFrame:
587
        """Annotate the contribution dict with metadata.
588

589
        Parameters
590
        ----------
591
        cont_dict : Holds the contribution data connected to the functions of methods
592
        x_fields : X-axis fieldnames, these are usually the indexes/keys of specific processes
593
        y_fields : Column names specific to the cont_dict to be labelled
594
        mask : Used in case of aggregation or special cases where the usual way of using the metadata cannot be used
595

596
        Returns
597
        -------
598
        Annotated contribution dict inside a pandas dataframe
599

600
        """
601
        dfs = (
×
602
            pd.DataFrame(v.values(), index=list(v.keys()), columns=[k])
603
            for k, v in cont_dict.items()
604
        )
605
        df = pd.concat(dfs, sort=False, axis=1)
×
606
        # If the cont_dict has tuples for keys, coerce df.columns into MultiIndex
607
        if all(isinstance(k, tuple) for k in cont_dict.keys()):
×
608
            df.columns = pd.MultiIndex.from_tuples(df.columns)
×
609
        special_keys = [("Total", ""), ("Rest (+)", ""), ("Rest (-)", "")]
×
610
        # replace all 0 values with NaN and drop all rows with only NaNs
611
        df = df.replace(0, np.nan)
×
612

613
        # sort on absolute mean of a row
614
        df_bot = deepcopy(df.loc[df.index.difference(special_keys)].dropna(how="all"))
×
615

616
        func = lambda row: np.nanmean(np.abs(row))
×
617
        if len(df_bot) > 1:  # but only sort if there is something to sort
×
618
            df_bot["_sort_me_"] = (df_bot.select_dtypes(include=np.number)).apply(func, axis=1)
×
619
            df_bot.sort_values(by="_sort_me_", ascending=False, inplace=True)
×
620
            del df_bot["_sort_me_"]
×
621

622
        df = pd.concat([df.iloc[:3, :], df_bot], axis=0)
×
623

624
        if not mask:
×
625
            joined = self.join_df_with_metadata(
×
626
                df, x_fields=x_fields, y_fields=y_fields, special_keys=special_keys
627
            )
628
        else:
629
            df.columns = self.get_labels(df.columns, fields=y_fields)
×
630
            keys = [k for k in df.index if k in mask]
×
631
            combined_keys = special_keys + keys
×
632
            # Reindex the combined_keys to ensure they always exist in the dataframe,
633
            # this avoids keys with 0 values not existing due to the 'dropna' action above.
634
            df = df.reindex(combined_keys, axis="index", fill_value=0.0)
×
635
            df.index = self.get_labels(df.index, mask=mask)
×
636
            joined = df
×
637
        if joined is not None:
×
638
            return joined.reset_index(drop=False)
×
639

640
    @staticmethod
1✔
641
    def adjust_table_unit(df: pd.DataFrame, method: Optional[tuple]) -> pd.DataFrame:
1✔
642
        """Given a dataframe, adjust the unit of the table to either match the given method, or not exist."""
643
        if "unit" not in df.columns:
×
644
            return df
×
645
        keys = df.index[~df["index"].isin({"Total", "Rest (+)", "Rest (-)"})]
×
646
        unit = bd.Method(method).metadata.get("unit") if method else "unit"
×
647
        df.loc[keys, "unit"] = unit
×
648
        return df
×
649

650
    @staticmethod
1✔
651
    def _build_inventory(
1✔
652
        inventory: dict, indices: dict, columns: list, fields: list
653
    ) -> pd.DataFrame:
654
        df = pd.DataFrame(inventory)
×
655
        df.index = pd.MultiIndex.from_tuples(ids_to_keys(indices.values()))
×
656
        df.columns = Contributions.get_labels(columns, max_length=30)
×
657
        metadata = AB_metadata.get_metadata(list(ids_to_keys(indices.values())), fields)
×
658
        joined = metadata.join(df)
×
659
        joined.reset_index(inplace=True, drop=True)
×
660
        return joined
×
661

662
    def inventory_df(
1✔
663
        self, inventory_type: str, columns: set = {"name", "database", "code"}
664
    ) -> pd.DataFrame:
665
        """Return an inventory dataframe with metadata of the given type."""
666
        try:
×
667
            data = self.inventory_data[inventory_type]
×
668
            appending = columns.difference(set(data[3]))
×
669
            for clmn in appending:
×
670
                data[3].append(clmn)
×
671
        except KeyError:
×
672
            raise ValueError(
×
673
                "Type must be either 'biosphere' or 'technosphere', "
674
                "'{}' given.".format(inventory_type)
675
            )
676
        return self._build_inventory(*data)
×
677

678
    def _build_lca_scores_df(self, scores: np.ndarray) -> pd.DataFrame:
1✔
679
        df = pd.DataFrame(
×
680
            scores,
681
            index=pd.MultiIndex.from_tuples(self.mlca.fu_activity_keys),
682
            columns=self.mlca.methods,
683
        )
684
        # Add amounts column.
685
        df["amount"] = [next(iter(fu.values()), 1.0) for fu in self.mlca.func_units]
×
686
        joined = Contributions.join_df_with_metadata(
×
687
            df, x_fields=self.act_fields, y_fields=None
688
        )
689
        # Precisely order the columns that are shown in the LCA Results overview
690
        # tab: “X kg of product Y from activity Z in location L, and database D”
691
        col_order = pd.Index(
×
692
            [
693
                "amount",
694
                "unit",
695
                "reference product",
696
                "name",
697
                "location",
698
                "database",
699
            ]
700
        )
701
        methods = joined.columns.difference(col_order, sort=False)
×
702
        joined = joined.loc[:, col_order.append(methods)]
×
703
        return joined.reset_index(drop=False)
×
704

705
    def lca_scores_df(self, normalized: bool = False) -> pd.DataFrame:
1✔
706
        """Return a metadata-annotated DataFrame of the LCA scores."""
707
        scores = (
×
708
            self.mlca.lca_scores if not normalized else self.mlca.lca_scores_normalized
709
        )
710
        return self._build_lca_scores_df(scores)
×
711

712
    @staticmethod
1✔
713
    def _build_contributions(data: np.ndarray, index: int, axis: int) -> np.ndarray:
1✔
714
        return data.take(index, axis=axis)
×
715

716
    def get_contributions(
1✔
717
        self, contribution, functional_unit=None, method=None, **kwargs
718
    ) -> np.ndarray:
719
        """Return a contribution matrix given the type and fu / method."""
720
        if all([functional_unit, method]) or not any([functional_unit, method]):
×
721
            raise ValueError(
×
722
                "It must be either by reference flow or by impact category. Provided:"
723
                "\n Reference flow: {} \n Impact Category: {}".format(
724
                    functional_unit, method
725
                )
726
            )
727
        dataset = {
×
728
            "process": self.mlca.process_contributions,
729
            "elementary_flow": self.mlca.elementary_flow_contributions,
730
        }
731
        if method:
×
732
            return self._build_contributions(
×
733
                dataset[contribution], self.mlca.method_index[method], 1
734
            )
735
        elif functional_unit:
×
736
            return self._build_contributions(
×
737
                dataset[contribution], self.mlca.func_key_dict[functional_unit], 0
738
            )
739

740
    def aggregate_by_parameters(
1✔
741
        self,
742
        contributions: np.ndarray,
743
        inventory: str,
744
        parameters: Union[str, list] = None,
745
    ):
746
        """Perform aggregation of the contribution data given parameters.
747

748
        Parameters
749
        ----------
750
        contributions : 2-dimensional contribution array
751
        inventory : Either 'biosphere' or 'technosphere', used to determine which inventory to use
752
        parameters : One or more parameters by which to aggregate the given contribution array.
753

754
        Returns
755
        -------
756
        aggregated : pd.DataFrame
757
            The aggregated 2-dimensional contribution array
758
        mask_index : dict
759
            Contains all of the values of the aggregation mask, linked to their indexes
760
        mask : list or dictview or None
761
            An optional list or dictview of the mask_index values
762

763
        """
764
        rev_index, keys, fields = self.aggregate_data[inventory]
×
765
        if not parameters:
×
766
            return contributions, rev_index, None
×
767

768
        df = pd.DataFrame(contributions).T
×
769
        columns = list(range(contributions.shape[0]))
×
770
        df.index = pd.MultiIndex.from_tuples(rev_index.values())
×
771
        metadata = AB_metadata.get_metadata(list(keys), fields)
×
772

773
        joined = metadata.join(df)
×
774
        joined.reset_index(inplace=True, drop=True)
×
775
        grouped = joined.groupby(parameters)
×
776
        aggregated = grouped[columns].sum()
×
777
        mask_index = {i: m for i, m in enumerate(aggregated.index)}
×
778

779
        return aggregated.T.values, mask_index, mask_index.values()
×
780

781
    def _contribution_rows(self, contribution: str, aggregator=None):
1✔
782
        if aggregator is None:
×
783
            return self.act_fields if contribution == self.ACT else self.ef_fields
×
784
        return aggregator if isinstance(aggregator, list) else [aggregator]
×
785

786
    def _correct_method_index(self, mthd_indx: list) -> dict:
1✔
787
        """A method for amending the tuples for impact method labels so
788
        that all tuples are fully printed.
789

790
        NOTE THE AMENDED TUPLES ARE COPIED, THIS SHOULD NOT BE USED TO
791
        ASSIGN OR MODIFY THE UNDERLYING DATA STRUCTURES!
792

793
        mthd_indx: a list of tuples for the impact method names
794
        """
795
        method_tuple_length = max([len(k) for k in mthd_indx])
×
796
        conv_dict = dict()
×
797
        for v, mthd in enumerate(mthd_indx):
×
798
            if len(mthd) < method_tuple_length:
×
799
                _l = list(mthd)
×
800
                for i in range(len(mthd), method_tuple_length):
×
801
                    _l.append("")
×
802
                mthd = tuple(_l)
×
803
            conv_dict[mthd] = v
×
804
        return conv_dict
×
805

806
    def _contribution_index_cols(self, **kwargs) -> (dict, Optional[Iterable]):
1✔
807
        if kwargs.get("method") is not None:
×
808
            return self.mlca.fu_index, self.act_fields
×
809
        return self._correct_method_index(self.mlca.methods), None
×
810

811
    def top_elementary_flow_contributions(
1✔
812
        self,
813
        functional_unit: Optional[tuple] = None,
814
        method: Optional[tuple] = None,
815
        aggregator: Union[str, list, None] = None,
816
        limit: int = 5,
817
        normalize: bool = False,
818
        limit_type: str = "number",
819
        total_range: bool = True,
820
        **kwargs,
821
    ) -> pd.DataFrame:
822
        """Return top EF contributions for either functional_unit or method.
823

824
        * If functional_unit: Compare the unit against all considered impact
825
        assessment methods.
826
        * If method: Compare the method against all involved processes.
827

828
        Parameters
829
        ----------
830
        functional_unit : The reference flow to compare all considered impact categories against
831
        method : The method to compare all considered reference flows against
832
        aggregator : Used to aggregate EF contributions over certain columns
833
        limit : The number of top contributions to consider
834
        normalize : Determines whether or not to normalize the contribution values
835
        limit_type : The type of limit, either 'number' or 'percent'
836
        total_range : Whether to consider the total for contributions the range (True) or the score (False)
837

838
        Returns
839
        -------
840
        Annotated top-contribution dataframe
841

842
        """
843
        contributions = self.get_contributions(
×
844
            self.EF, functional_unit, method, **kwargs
845
        )
846

847
        x_fields = self._contribution_rows(self.EF, aggregator)
×
848
        index, y_fields = self._contribution_index_cols(
×
849
            functional_unit=functional_unit, method=method
850
        )
851
        contributions, rev_index, mask = self.aggregate_by_parameters(
×
852
            contributions, self.BIOS, aggregator
853
        )
854

855
        # Normalise if required
856
        if normalize:
×
NEW
857
            contributions = self.normalize(contributions, total_range)
×
858

859
        top_cont_dict = self._build_dict(
×
860
            contributions, index, rev_index, limit, limit_type, total_range
861
        )
862
        labelled_df = self.get_labelled_contribution_dict(
×
863
            top_cont_dict, x_fields=x_fields, y_fields=y_fields, mask=mask
864
        )
865
        self.adjust_table_unit(labelled_df, method)
×
866
        return labelled_df
×
867

868
    def top_process_contributions(
1✔
869
        self,
870
        functional_unit: Optional[tuple] = None,
871
        method: Optional[tuple] = None,
872
        aggregator: Union[str, list, None] = None,
873
        limit: int = 5,
874
        normalize: bool = False,
875
        limit_type: str = "number",
876
        total_range: bool = True,
877
        **kwargs,
878
    ) -> pd.DataFrame:
879
        """Return top process contributions for functional_unit or method.
880

881
        * If functional_unit: Compare the process against all considered impact
882
        assessment methods.
883
        * If method: Compare the method against all involved processes.
884

885
        Parameters
886
        ----------
887
        functional_unit : The reference flow to compare all considered impact categories against
888
        method : The method to compare all considered reference flows against
889
        aggregator : Used to aggregate EF contributions over certain columns
890
        limit : The number of top contributions to consider
891
        normalize : Determines whether or not to normalize the contribution values
892
        limit_type : The type of limit, either 'number' or 'percent'
893

894
        Returns
895
        -------
896
        Annotated top-contribution dataframe
897

898
        """
899
        contributions = self.get_contributions(
×
900
            self.ACT, functional_unit, method, **kwargs
901
        )
902

903
        x_fields = self._contribution_rows(self.ACT, aggregator)
×
904
        index, y_fields = self._contribution_index_cols(
×
905
            functional_unit=functional_unit, method=method
906
        )
907
        contributions, rev_index, mask = self.aggregate_by_parameters(
×
908
            contributions, self.TECH, aggregator
909
        )
910

911
        # Normalise if required
912
        if normalize:
×
NEW
913
            contributions = self.normalize(contributions, total_range)
×
914

915
        top_cont_dict = self._build_dict(
×
916
            contributions, index, rev_index, limit, limit_type, total_range
917
        )
918
        labelled_df = self.get_labelled_contribution_dict(
×
919
            top_cont_dict, x_fields=x_fields, y_fields=y_fields, mask=mask
920
        )
921
        self.adjust_table_unit(labelled_df, method)
×
922
        return labelled_df
×
923

924

925
def ids_to_keys(index_list):
1✔
926
    return [bd.get_activity(i).key if isinstance(i, int) else i for i in index_list]
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc