• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

LCA-ActivityBrowser / activity-browser / 11146968379

02 Oct 2024 03:37PM UTC coverage: 53.511% (-0.7%) from 54.184%
11146968379

Pull #1046

github

web-flow
Merge a9ca09f15 into 05bad4148
Pull Request #1046: First-Tier contribution analysis tab

17 of 244 new or added lines in 4 files covered. (6.97%)

3 existing lines in 3 files now uncovered.

8368 of 15638 relevant lines covered (53.51%)

0.54 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

20.85
/activity_browser/bwutils/multilca.py
1
from collections import OrderedDict
1✔
2
from copy import deepcopy
1✔
3
from typing import Iterable, Optional, Union
1✔
4
from logging import getLogger
1✔
5

6
import bw2analyzer as ba
1✔
7
import bw2calc as bc
1✔
8
import numpy as np
1✔
9
import pandas as pd
1✔
10
from PySide2.QtWidgets import QApplication, QMessageBox
1✔
11

12
from activity_browser.mod import bw2data as bd
1✔
13

14
from .commontasks import wrap_text
1✔
15
from .errors import ReferenceFlowValueError
1✔
16
from .metadata import AB_metadata
1✔
17

18
log = getLogger(__name__)
1✔
19
ca = ba.ContributionAnalysis()
1✔
20

21

22
class MLCA(object):
1✔
23
    """Wrapper class for performing LCA calculations with many reference flows and impact categories.
24

25
    Needs to be passed a brightway ``calculation_setup`` name.
26

27
    This class does not subclass the `LCA` class, and performs all
28
    calculations upon instantiation.
29

30
    Initialization creates `self.lca_scores`, which is a NumPy array
31
    of LCA scores, with rows of reference flows and columns of impact categories.
32
    Ordering is the same as in the `calculation_setup`.
33

34
    This class is adapted from `bw2calc.multi_lca.MultiLCA` and includes a
35
    number of additional attributes required to perform process- and
36
    elementary flow contribution analysis (see class `Contributions` below).
37

38
    Parameters
39
    ----------
40
    cs_name : str
41
        Name of the calculation setup
42

43
    Attributes
44
    ----------
45
    func_units_dict
46
    all_databases
47
    lca_scores_normalized
48
    func_units: list
49
        List of dictionaries, each containing the reference flow key and
50
        its required output
51
    fu_activity_keys: list
52
        The reference flow keys
53
    fu_index: dict
54
        Links the reference flows to a specific index
55
    rev_fu_index: dict
56
        Same as `fu_index` but using the indexes as keys
57
    methods: list
58
        The impact categories of the calculation setup
59
    method_index: dict
60
        Links the impact categories to a specific index
61
    rev_method_index: dict
62
        Same as `method_index` but using the indexes as keys
63
    lca: `bw2calc.lca.LCA`
64
        Brightway LCA instance used to perform LCA, LCI and LCIA
65
        calculations
66
    method_matrices: list
67
        Contains the characterization matrix for each impact category.
68
    lca_scores: `numpy.ndarray`
69
        2-dimensional array of shape (`func_units`, `methods`) holding the
70
        calculated LCA scores of each combination of reference flow and
71
        impact assessment method
72
    rev_activity_dict: dict
73
        See `bw2calc.lca.LCA.reverse_dict`
74
    rev_product_dict: dict
75
        See `bw2calc.lca.LCA.reverse_dict`
76
    rev_biosphere_dict: dict
77
        See `bw2calc.lca.LCA.reverse_dict`
78
    scaling_factors: dict
79
        Contains the life-cycle inventory scaling factors per reference flow
80
    technosphere_flows: dict
81
        Contains the calculated technosphere flows per reference flow
82
    inventory: dict
83
        Life cycle inventory (biosphere flows) per reference flow
84
    inventories: dict
85
        Biosphere flows per reference flow and impact category combination
86
    characterized_inventories: dict
87
        Inventory multiplied by scaling (relative impact on environment) per
88
        reference flow and impact category combination
89
    elementary_flow_contributions: `numpy.ndarray`
90
        3-dimensional array of shape (`func_units`, `methods`, `biosphere`)
91
        which holds the characterized inventory results summed along the
92
        technosphere axis
93
    process_contributions: `numpy.ndarray`
94
        3-dimensional array of shape (`func_units`, `methods`, `technosphere`)
95
        which holds the characterized inventory results summed along the
96
        biosphere axis
97
    func_unit_translation_dict: dict
98
        Contains the reference flow key and its expected output linked to
99
        the brightway activity label.
100
    func_key_dict: dict
101
        An index of the brightway activity labels
102
    func_key_list: list
103
        A derivative of `func_key_dict` containing just the keys
104

105
    Raises
106
    ------
107
    ValueError
108
        If the given `cs_name` cannot be found in brightway calculation_setups
109

110
    """
111

112
    def __init__(self, cs_name: str):
1✔
113
        try:
×
114
            cs = bd.calculation_setups[cs_name]
×
115
        except KeyError:
×
116
            raise ValueError(f"{cs_name} is not a known `calculation_setup`.")
×
117

118
        # check if all values are non-zero
119
        # cs['inv'] contains all reference flows (rf),
120
        # all values of rf are the individual reference flow items.
121
        if [v for rf in cs["inv"] for v in rf.values() if v == 0]:
×
122
            msg = QMessageBox()
×
123
            msg.setWindowTitle("Reference flows equal 0")
×
124
            msg.setText("All reference flows must be non-zero.")
×
125
            msg.setInformativeText(
×
126
                "Please enter a valid value before calculating LCA results again."
127
            )
128
            msg.setIcon(QMessageBox.Warning)
×
129
            QApplication.restoreOverrideCursor()
×
130
            msg.exec_()
×
131
            raise ReferenceFlowValueError("Reference flow == 0")
×
132

133
        # reference flows and related indexes
134
        self.func_units = cs["inv"]
×
135
        self.fu_activity_keys = [list(fu.keys())[0] for fu in self.func_units]
×
136
        self.fu_index = {k: i for i, k in enumerate(self.fu_activity_keys)}
×
137
        self.rev_fu_index = {v: k for k, v in self.fu_index.items()}
×
138

139
        # Methods and related indexes
140
        self.methods = cs["ia"]
×
141
        self.method_index = {m: i for i, m in enumerate(self.methods)}
×
142
        self.rev_method_index = {v: k for k, v in self.method_index.items()}
×
143

144
        # initial LCA and prepare method matrices
145
        self.lca = self._construct_lca()
×
146
        self.lca.lci(factorize=True)
×
147
        self.method_matrices = []
×
148
        for method in self.methods:
×
149
            self.lca.switch_method(method)
×
150
            self.method_matrices.append(self.lca.characterization_matrix)
×
151

152
        self.lca_scores = np.zeros((len(self.func_units), len(self.methods)))
×
153

154
        # data to be stored
155
        (self.rev_activity_dict, self.rev_product_dict, self.rev_biosphere_dict) = (
×
156
            self.lca.reverse_dict()
157
        )
158

159
        # Scaling
160
        self.scaling_factors = dict()
×
161

162
        # Technosphere product flows for a given reference flow
163
        self.technosphere_flows = dict()
×
164
        # Life cycle inventory (biosphere flows) by reference flow
165
        self.inventory = dict()
×
166
        # Inventory (biosphere flows) for specific reference flow (e.g. 2000x15000) and impact category.
167
        self.inventories = dict()
×
168
        # Inventory multiplied by scaling (relative impact on environment) per impact category.
169
        self.characterized_inventories = dict()
×
170

171
        # Summarized contributions for EF and processes.
172
        self.elementary_flow_contributions = np.zeros(
×
173
            (
174
                len(self.func_units),
175
                len(self.methods),
176
                self.lca.biosphere_matrix.shape[0],
177
            )
178
        )
179
        self.process_contributions = np.zeros(
×
180
            (
181
                len(self.func_units),
182
                len(self.methods),
183
                self.lca.technosphere_matrix.shape[0],
184
            )
185
        )
186

187
        self.func_unit_translation_dict = {}
×
188
        for fu in self.func_units:
×
189
            key = next(iter(fu))
×
190
            amount = fu[key]
×
191
            act = bd.get_activity(key)
×
192
            self.func_unit_translation_dict[
×
193
                (
194
                    f'{act["name"]} | '
195
                    f'{act["reference product"]} | '
196
                    f'{act["location"]} | '
197
                    f'{act["database"]} | '
198
                    f"{amount}"
199
                )
200
            ] = fu
201
        self.func_key_dict = {
×
202
            m: i for i, m in enumerate(self.func_unit_translation_dict.keys())
203
        }
204
        self.func_key_list = list(self.func_unit_translation_dict.keys())
×
205

206
    def _construct_lca(self):
1✔
207
        return bc.LCA(demand=self.func_units_dict, method=self.methods[0])
×
208

209
    def _perform_calculations(self):
1✔
210
        """Isolates the code which performs calculations to allow subclasses
211
        to either alter the code or redo calculations after matrix substitution.
212
        """
213
        for row, func_unit in enumerate(self.func_units):
×
214
            # Do the LCA for the current reference flow
215
            try:
×
216
                self.lca.redo_lci(func_unit)
×
217
            except:
×
218
                # bw25 compatibility
219
                key = list(func_unit.keys())[0]
×
220
                self.lca.redo_lci({bd.get_activity(key).id: func_unit[key]})
×
221

222
            # Now update the:
223
            # - Scaling factors
224
            # - Technosphere flows
225
            # - Life cycle inventory
226
            # - Life-cycle inventory (disaggregated by contributing process)
227
            # for current reference flow
228
            self.scaling_factors.update({str(func_unit): self.lca.supply_array})
×
229
            self.technosphere_flows.update(
×
230
                {
231
                    str(func_unit): np.multiply(
232
                        self.lca.supply_array, self.lca.technosphere_matrix.diagonal()
233
                    )
234
                }
235
            )
236
            self.inventory.update(
×
237
                {str(func_unit): np.array(self.lca.inventory.sum(axis=1)).ravel()}
238
            )
239
            self.inventories.update({str(func_unit): self.lca.inventory})
×
240

241
            # Now, for each method, take the current reference flow and do inventory analysis
242
            for col, cf_matrix in enumerate(self.method_matrices):
×
243
                self.lca.characterization_matrix = cf_matrix
×
244
                self.lca.lcia_calculation()
×
245
                self.lca_scores[row, col] = self.lca.score
×
246
                self.characterized_inventories[row, col] = (
×
247
                    self.lca.characterized_inventory.copy()
248
                )
249
                self.elementary_flow_contributions[row, col] = np.array(
×
250
                    self.lca.characterized_inventory.sum(axis=1)
251
                ).ravel()
252
                self.process_contributions[row, col] = (
×
253
                    self.lca.characterized_inventory.sum(axis=0)
254
                )
255

256
    def calculate(self):
1✔
257
        self._perform_calculations()
×
258

259
    @property
1✔
260
    def func_units_dict(self) -> dict:
1✔
261
        """Return a dictionary of reference flow (key, demand)."""
262
        return {key: 1 for func_unit in self.func_units for key in func_unit}
×
263

264
    @property
1✔
265
    def all_databases(self) -> set:
1✔
266
        """Get all databases linked to the reference flows."""
267

268
        def get_dependents(dbs: set, dependents: list) -> set:
×
269
            for dep in (bd.databases[db].get("depends", []) for db in dependents):
×
270
                if not dbs.issuperset(dep):
×
271
                    dbs = get_dependents(dbs.union(dep), dep)
×
272
            return dbs
×
273

274
        dbs = set(f[0] for f in self.fu_activity_keys)
×
275
        dbs = get_dependents(dbs, list(dbs))
×
276
        # In rare cases, the default biosphere is not found as a dependency, see:
277
        # https://github.com/LCA-ActivityBrowser/activity-browser/issues/298
278
        # Always include it.
279
        dbs.add(bd.config.biosphere)
×
280
        return dbs
×
281

282
    def get_results_for_method(self, index: int = 0) -> pd.DataFrame:
1✔
283
        data = self.lca_scores[:, index]
×
284
        return pd.DataFrame(data, index=self.fu_activity_keys)
×
285

286
    @property
1✔
287
    def lca_scores_normalized(self) -> np.ndarray:
1✔
288
        """Normalize LCA scores by impact assessment method."""
289
        return self.lca_scores / self.lca_scores.max(axis=0)
×
290

291
    def get_normalized_scores_df(self) -> pd.DataFrame:
1✔
292
        """To be used for the currently inactive CorrelationPlot."""
293
        labels = [str(x + 1) for x in range(len(self.func_units))]
×
294
        return pd.DataFrame(data=self.lca_scores_normalized.T, columns=labels)
×
295

296
    def lca_scores_to_dataframe(self) -> pd.DataFrame:
1✔
297
        """Returns a dataframe of LCA scores using FU labels as index and
298
        methods as columns.
299
        """
300
        return pd.DataFrame(
×
301
            data=self.lca_scores,
302
            index=pd.Index(self.fu_activity_keys),
303
            columns=pd.Index(self.methods),
304
        )
305

306
    def get_all_metadata(self) -> None:
1✔
307
        """Populate AB_metadata with relevant database values.
308

309
        Set metadata in form of a Pandas DataFrame for biosphere and
310
        technosphere databases for tables and additional aggregation.
311
        """
312
        AB_metadata.add_metadata(self.all_databases)
×
313

314

315
class Contributions(object):
1✔
316
    """Contribution Analysis built on top of the Multi-LCA class.
317

318
    This class requires instantiated MLCA and MetaDataStore objects.
319

320
    Parameters
321
    ----------
322
    mlca : `MLCA`
323
        An instantiated MLCA object
324

325
    Attributes
326
    ----------
327
    DEFAULT_ACT_FIELDS : list
328
        Default activity/reference flow column names
329
    DEFAULT_EF_FIELDS : list
330
        Default environmental flow column names
331
    mlca: `MLCA`
332
        Linked `MLCA` instance used for contribution calculations
333
    act_fields: list
334
        technosphere-specific metadata column names
335
    ef_fields: list
336
        biosphere-specific metadata column names
337

338
    Raises
339
    ------
340
    ValueError
341
        If the given `mlca` object is not an instance of `MLCA`
342

343
    """
344

345
    ACT = "process"
1✔
346
    EF = "elementary_flow"
1✔
347
    TECH = "technosphere"
1✔
348
    BIOS = "biosphere"
1✔
349

350
    DEFAULT_ACT_FIELDS = ["reference product", "name", "location", "unit", "database"]
1✔
351
    DEFAULT_EF_FIELDS = ["name", "categories", "type", "unit", "database"]
1✔
352

353
    DEFAULT_ACT_AGGREGATES = ["none"] + DEFAULT_ACT_FIELDS
1✔
354
    DEFAULT_EF_AGGREGATES = ["none"] + DEFAULT_EF_FIELDS
1✔
355

356
    def __init__(self, mlca):
1✔
357
        if not isinstance(mlca, MLCA):
×
358
            raise ValueError("Must pass an MLCA object. Passed:", type(mlca))
×
359
        self.mlca = mlca
×
360
        # Ensure MetaDataStore is updated.
361
        self.mlca.get_all_metadata()
×
362

363
        # Set default metadata keys (those not in the dataframe will be eliminated)
364
        self.act_fields = AB_metadata.get_existing_fields(self.DEFAULT_ACT_FIELDS)
×
365
        self.ef_fields = AB_metadata.get_existing_fields(self.DEFAULT_EF_FIELDS)
×
366

367
        # Specific datastructures for retrieving relevant MLCA data
368
        # inventory: inventory, reverse index, metadata keys, metadata fields
369
        self.inventory_data = {
×
370
            "biosphere": (
371
                self.mlca.inventory,
372
                self.mlca.rev_biosphere_dict,
373
                self.mlca.fu_activity_keys,
374
                self.ef_fields,
375
            ),
376
            "technosphere": (
377
                self.mlca.technosphere_flows,
378
                self.mlca.rev_activity_dict,
379
                self.mlca.fu_activity_keys,
380
                self.act_fields,
381
            ),
382
        }
383
        # aggregation: reverse index, metadata keys, metadata fields
384
        self.aggregate_data = {
×
385
            "biosphere": (
386
                self.mlca.rev_biosphere_dict,
387
                self.mlca.lca.biosphere_dict,
388
                self.ef_fields,
389
            ),
390
            "technosphere": (
391
                self.mlca.rev_activity_dict,
392
                self.mlca.lca.activity_dict,
393
                self.act_fields,
394
            ),
395
        }
396

397
    def normalize(self, contribution_array: np.ndarray) -> np.ndarray:
1✔
398
        """Normalise the contribution array.
399

400
        Parameters
401
        ----------
402
        contribution_array : A 2-dimensional contribution array
403

404
        Returns
405
        -------
406
        2-dimensional array of same shape, with scores normalized.
407

408
        """
409
        scores = abs(contribution_array.sum(axis=1, keepdims=True))
×
410
        return contribution_array / scores
×
411

412
    def _build_dict(
1✔
413
        self,
414
        contributions: np.ndarray,
415
        FU_M_index: dict,
416
        rev_dict: dict,
417
        limit: int,
418
        limit_type: str,
419
    ) -> dict:
420
        """Sort the given contribution array on method or reference flow column.
421

422
        Parameters
423
        ----------
424
        contributions: A 2-dimensional contribution array
425
        FU_M_index : Dictionary which maps the reference flows or methods to their matching columns
426
        rev_dict : 'reverse' dictionary used to map correct activity/method to its value
427
        limit : Number of top-contributing items to include
428
        limit_type : Either "number" or "percent", ContributionAnalysis.sort_array for complete explanation
429

430
        Returns
431
        -------
432
        Top-contributing flows per method or activity
433

434
        """
435
        topcontribution_dict = dict()
×
436
        for fu_or_method, col in FU_M_index.items():
×
437

NEW
438
            contribution_col = contributions[col, :]
×
NEW
439
            total = contribution_col.sum()
×
440

UNCOV
441
            top_contribution = ca.sort_array(
×
442
                contribution_col, limit=limit, limit_type=limit_type, total=total
443
            )
444

445
            # split and calculate remaining rest sections for positive and negative part
NEW
446
            pos_rest = (
×
447
                np.sum(contribution_col[contribution_col > 0])
448
                - np.sum(top_contribution[top_contribution[:, 0] > 0][:, 0])
449
            )
NEW
450
            neg_rest = (
×
451
                    np.sum(contribution_col[contribution_col < 0])
452
                    - np.sum(top_contribution[top_contribution[:, 0] < 0][:, 0])
453
            )
454

455
            cont_per = OrderedDict()
×
456
            cont_per.update(
×
457
                {
458
                    ("Total", ""): total,
459
                    ("Rest (+)", ""): pos_rest,
460
                    ("Rest (-)", ""): neg_rest,
461
                }
462
            )
463
            for value, index in top_contribution:
×
464
                cont_per.update({rev_dict[index]: value})
×
465
            topcontribution_dict.update({fu_or_method: cont_per})
×
466
        return topcontribution_dict
×
467

468
    @staticmethod
1✔
469
    def get_labels(
1✔
470
        key_list: pd.MultiIndex,
471
        fields: Optional[list] = None,
472
        separator: str = " | ",
473
        max_length: int = False,
474
        mask: Optional[list] = None,
475
    ) -> list:
476
        """Generate labels from metadata information.
477

478
        Setting max_length will wrap the label into a multi-line string if
479
        size is larger than max_length.
480

481
        Parameters
482
        ----------
483
        key_list : An index containing 'keys' to be retrieved from the MetaDataStore
484
        fields : List of column-names to be included from the MetaDataStore
485
        separator : Specific separator to use when joining strings together
486
        max_length : Allowed character length before string is wrapped over multiple lines
487
        mask : Instead of the metadata, this list is used to check keys against.
488
            Use if data is aggregated or keys do not exist in MetaDataStore
489

490
        Returns
491
        -------
492
        Translated and/or joined (and wrapped) labels matching the keys
493

494
        """
495
        fields = (
×
496
            fields if fields else ["name", "reference product", "location", "database"]
497
        )
498
        keys = (
×
499
            k for k in key_list
500
        )  # need to do this as the keys come from a pd.Multiindex
501
        translated_keys = []
×
502
        for k in keys:
×
503
            if mask and k in mask:
×
504
                translated_keys.append(k)
×
505
            elif isinstance(k, str):
×
506
                translated_keys.append(k)
×
507
            elif k in AB_metadata.index:
×
508
                translated_keys.append(
×
509
                    separator.join(
510
                        [str(l) for l in list(AB_metadata.get_metadata(k, fields))]
511
                    )
512
                )
513
            else:
514
                translated_keys.append(separator.join([i for i in k if i != ""]))
×
515
        if max_length:
×
516
            translated_keys = [
×
517
                wrap_text(k, max_length=max_length) for k in translated_keys
518
            ]
519
        return translated_keys
×
520

521
    @classmethod
1✔
522
    def join_df_with_metadata(
1✔
523
        cls,
524
        df: pd.DataFrame,
525
        x_fields: Optional[list] = None,
526
        y_fields: Optional[list] = None,
527
        special_keys: Optional[list] = None,
528
    ) -> pd.DataFrame:
529
        """Join a dataframe that has keys on the index with metadata.
530

531
        Metadata fields are defined in x_fields.
532
        If columns are also keys (and not, e.g. method names), they can also
533
        be replaced with metadata, if y_fields are provided.
534

535
        Parameters
536
        ----------
537
        df : Simple DataFrame containing processed data
538
        x_fields : List of additional columns to add from the MetaDataStore
539
        y_fields : List of column keys for the data in the df dataframe
540
        special_keys : List of specific items to place at the top of the dataframe
541

542
        Returns
543
        -------
544
        Expanded and metadata-annotated dataframe
545

546
        """
547

548
        # replace column keys with labels
549
        df.columns = cls.get_labels(df.columns, fields=y_fields)
×
550
        # Coerce index to MultiIndex if it currently isn't
551
        if not isinstance(df.index, pd.MultiIndex):
×
552
            df.index = pd.MultiIndex.from_tuples(ids_to_keys(df.index))
×
553

554
        # get metadata for rows
555
        keys = [k for k in df.index if k in AB_metadata.index]
×
556
        metadata = AB_metadata.get_metadata(keys, x_fields)
×
557

558
        # join data with metadata
559
        joined = metadata.join(df, how="outer")
×
560

561
        if special_keys:
×
562
            # replace index keys with labels
NEW
563
            try:  # first put Total, Rest (+) and Rest (-) to the first three positions in the dataframe
×
564
                complete_index = special_keys + keys
×
565
                joined = joined.reindex(complete_index, axis="index", fill_value=0.0)
×
566
            except:
×
567
                log.error(
×
568
                    "Could not put 'Total', 'Rest (+)' and 'Rest (-)' on positions 0, 1 and 2 in the dataframe."
569
                )
570
        joined.index = cls.get_labels(joined.index, fields=x_fields)
×
571
        return joined
×
572

573
    def get_labelled_contribution_dict(
1✔
574
        self,
575
        cont_dict: dict,
576
        x_fields: list = None,
577
        y_fields: list = None,
578
        mask: list = None,
579
    ) -> pd.DataFrame:
580
        """Annotate the contribution dict with metadata.
581

582
        Parameters
583
        ----------
584
        cont_dict : Holds the contribution data connected to the functions of methods
585
        x_fields : X-axis fieldnames, these are usually the indexes/keys of specific processes
586
        y_fields : Column names specific to the cont_dict to be labelled
587
        mask : Used in case of aggregation or special cases where the usual way of using the metadata cannot be used
588

589
        Returns
590
        -------
591
        Annotated contribution dict inside a pandas dataframe
592

593
        """
594
        dfs = (
×
595
            pd.DataFrame(v.values(), index=list(v.keys()), columns=[k])
596
            for k, v in cont_dict.items()
597
        )
598
        df = pd.concat(dfs, sort=False, axis=1)
×
599
        # If the cont_dict has tuples for keys, coerce df.columns into MultiIndex
600
        if all(isinstance(k, tuple) for k in cont_dict.keys()):
×
601
            df.columns = pd.MultiIndex.from_tuples(df.columns)
×
NEW
602
        special_keys = [("Total", ""), ("Rest (+)", ""), ("Rest (-)", "")]
×
603

604
        # replace all 0 values with NaN and drop all rows with only NaNs
605
        # EXCEPT for the special keys
606
        df.index = ids_to_keys(df.index)
×
607
        index = (
×
608
            df.loc[df.index.difference(special_keys)]
609
            .replace(0, np.nan)
610
            .dropna(how="all")
611
            .index.union(special_keys)
612
        )
613
        df = df.loc[index]
×
614

615
        # sort on absolute mean of a row
NEW
616
        df_bot = deepcopy(df.iloc[3:, :])
×
617

NEW
618
        func = lambda row: np.nanmean(np.abs(row))
×
NEW
619
        if len(df_bot) > 1:  # but only sort if there is something to sort
×
NEW
620
            df_bot["_sort_me_"] = (df_bot.select_dtypes(include=np.number)).apply(func, axis=1)
×
NEW
621
            df_bot.sort_values(by="_sort_me_", ascending=False, inplace=True)
×
NEW
622
            del df_bot["_sort_me_"]
×
623

NEW
624
        df = pd.concat([df.iloc[:3, :], df_bot], axis=0)
×
625

626
        if not mask:
×
627
            joined = self.join_df_with_metadata(
×
628
                df, x_fields=x_fields, y_fields=y_fields, special_keys=special_keys
629
            )
630
        else:
631
            df.columns = self.get_labels(df.columns, fields=y_fields)
×
632
            keys = [k for k in df.index if k in mask]
×
633
            combined_keys = special_keys + keys
×
634
            # Reindex the combined_keys to ensure they always exist in the dataframe,
635
            # this avoids keys with 0 values not existing due to the 'dropna' action above.
636
            df = df.reindex(combined_keys, axis="index", fill_value=0.0)
×
637
            df.index = self.get_labels(df.index, mask=mask)
×
638
            joined = df
×
639
        if joined is not None:
×
640
            return joined.reset_index(drop=False)
×
641

642
    @staticmethod
1✔
643
    def adjust_table_unit(df: pd.DataFrame, method: Optional[tuple]) -> pd.DataFrame:
1✔
644
        """Given a dataframe, adjust the unit of the table to either match the given method, or not exist."""
645
        if "unit" not in df.columns:
×
646
            return df
×
NEW
647
        keys = df.index[~df["index"].isin({"Total", "Rest (+)", "Rest (-)"})]
×
648
        unit = bd.Method(method).metadata.get("unit") if method else "unit"
×
649
        df.loc[keys, "unit"] = unit
×
650
        return df
×
651

652
    @staticmethod
1✔
653
    def _build_inventory(
1✔
654
        inventory: dict, indices: dict, columns: list, fields: list
655
    ) -> pd.DataFrame:
656
        df = pd.DataFrame(inventory)
×
657
        df.index = pd.MultiIndex.from_tuples(ids_to_keys(indices.values()))
×
658
        df.columns = Contributions.get_labels(columns, max_length=30)
×
659
        metadata = AB_metadata.get_metadata(list(ids_to_keys(indices.values())), fields)
×
660
        joined = metadata.join(df)
×
661
        joined.reset_index(inplace=True, drop=True)
×
662
        return joined
×
663

664
    def inventory_df(
1✔
665
        self, inventory_type: str, columns: set = {"name", "database", "code"}
666
    ) -> pd.DataFrame:
667
        """Return an inventory dataframe with metadata of the given type."""
668
        try:
×
669
            data = self.inventory_data[inventory_type]
×
670
            appending = columns.difference(set(data[3]))
×
671
            for clmn in appending:
×
672
                data[3].append(clmn)
×
673
        except KeyError:
×
674
            raise ValueError(
×
675
                "Type must be either 'biosphere' or 'technosphere', "
676
                "'{}' given.".format(inventory_type)
677
            )
678
        return self._build_inventory(*data)
×
679

680
    def _build_lca_scores_df(self, scores: np.ndarray) -> pd.DataFrame:
1✔
681
        df = pd.DataFrame(
×
682
            scores,
683
            index=pd.MultiIndex.from_tuples(self.mlca.fu_activity_keys),
684
            columns=self.mlca.methods,
685
        )
686
        # Add amounts column.
687
        df["amount"] = [next(iter(fu.values()), 1.0) for fu in self.mlca.func_units]
×
688
        joined = Contributions.join_df_with_metadata(
×
689
            df, x_fields=self.act_fields, y_fields=None
690
        )
691
        # Precisely order the columns that are shown in the LCA Results overview
692
        # tab: “X kg of product Y from activity Z in location L, and database D”
693
        col_order = pd.Index(
×
694
            [
695
                "amount",
696
                "unit",
697
                "reference product",
698
                "name",
699
                "location",
700
                "database",
701
            ]
702
        )
703
        methods = joined.columns.difference(col_order, sort=False)
×
704
        joined = joined.loc[:, col_order.append(methods)]
×
705
        return joined.reset_index(drop=False)
×
706

707
    def lca_scores_df(self, normalized: bool = False) -> pd.DataFrame:
1✔
708
        """Return a metadata-annotated DataFrame of the LCA scores."""
709
        scores = (
×
710
            self.mlca.lca_scores if not normalized else self.mlca.lca_scores_normalized
711
        )
712
        return self._build_lca_scores_df(scores)
×
713

714
    @staticmethod
1✔
715
    def _build_contributions(data: np.ndarray, index: int, axis: int) -> np.ndarray:
1✔
716
        return data.take(index, axis=axis)
×
717

718
    def get_contributions(
1✔
719
        self, contribution, functional_unit=None, method=None, **kwargs
720
    ) -> np.ndarray:
721
        """Return a contribution matrix given the type and fu / method."""
722
        if all([functional_unit, method]) or not any([functional_unit, method]):
×
723
            raise ValueError(
×
724
                "It must be either by reference flow or by impact category. Provided:"
725
                "\n Reference flow: {} \n Impact Category: {}".format(
726
                    functional_unit, method
727
                )
728
            )
729
        dataset = {
×
730
            "process": self.mlca.process_contributions,
731
            "elementary_flow": self.mlca.elementary_flow_contributions,
732
        }
733
        if method:
×
734
            return self._build_contributions(
×
735
                dataset[contribution], self.mlca.method_index[method], 1
736
            )
737
        elif functional_unit:
×
738
            return self._build_contributions(
×
739
                dataset[contribution], self.mlca.func_key_dict[functional_unit], 0
740
            )
741

742
    def aggregate_by_parameters(
1✔
743
        self,
744
        contributions: np.ndarray,
745
        inventory: str,
746
        parameters: Union[str, list] = None,
747
    ):
748
        """Perform aggregation of the contribution data given parameters.
749

750
        Parameters
751
        ----------
752
        contributions : 2-dimensional contribution array
753
        inventory : Either 'biosphere' or 'technosphere', used to determine which inventory to use
754
        parameters : One or more parameters by which to aggregate the given contribution array.
755

756
        Returns
757
        -------
758
        aggregated : pd.DataFrame
759
            The aggregated 2-dimensional contribution array
760
        mask_index : dict
761
            Contains all of the values of the aggregation mask, linked to their indexes
762
        mask : list or dictview or None
763
            An optional list or dictview of the mask_index values
764

765
        """
766
        rev_index, keys, fields = self.aggregate_data[inventory]
×
767
        if not parameters:
×
768
            return contributions, rev_index, None
×
769

770
        df = pd.DataFrame(contributions).T
×
771
        columns = list(range(contributions.shape[0]))
×
772
        df.index = pd.MultiIndex.from_tuples(rev_index.values())
×
773
        metadata = AB_metadata.get_metadata(list(keys), fields)
×
774

775
        joined = metadata.join(df)
×
776
        joined.reset_index(inplace=True, drop=True)
×
777
        grouped = joined.groupby(parameters)
×
778
        aggregated = grouped[columns].sum()
×
779
        mask_index = {i: m for i, m in enumerate(aggregated.index)}
×
780

781
        return aggregated.T.values, mask_index, mask_index.values()
×
782

783
    def _contribution_rows(self, contribution: str, aggregator=None):
1✔
784
        if aggregator is None:
×
785
            return self.act_fields if contribution == self.ACT else self.ef_fields
×
786
        return aggregator if isinstance(aggregator, list) else [aggregator]
×
787

788
    def _correct_method_index(self, mthd_indx: list) -> dict:
1✔
789
        """A method for amending the tuples for impact method labels so
790
        that all tuples are fully printed.
791

792
        NOTE THE AMENDED TUPLES ARE COPIED, THIS SHOULD NOT BE USED TO
793
        ASSIGN OR MODIFY THE UNDERLYING DATA STRUCTURES!
794

795
        mthd_indx: a list of tuples for the impact method names
796
        """
797
        method_tuple_length = max([len(k) for k in mthd_indx])
×
798
        conv_dict = dict()
×
799
        for v, mthd in enumerate(mthd_indx):
×
800
            if len(mthd) < method_tuple_length:
×
801
                _l = list(mthd)
×
802
                for i in range(len(mthd), method_tuple_length):
×
803
                    _l.append("")
×
804
                mthd = tuple(_l)
×
805
            conv_dict[mthd] = v
×
806
        return conv_dict
×
807

808
    def _contribution_index_cols(self, **kwargs) -> (dict, Optional[Iterable]):
1✔
809
        if kwargs.get("method") is not None:
×
810
            return self.mlca.fu_index, self.act_fields
×
811
        return self._correct_method_index(self.mlca.methods), None
×
812

813
    def top_elementary_flow_contributions(
1✔
814
        self,
815
        functional_unit: Optional[tuple] = None,
816
        method: Optional[tuple] = None,
817
        aggregator: Union[str, list, None] = None,
818
        limit: int = 5,
819
        normalize: bool = False,
820
        limit_type: str = "number",
821
        **kwargs,
822
    ) -> pd.DataFrame:
823
        """Return top EF contributions for either functional_unit or method.
824

825
        * If functional_unit: Compare the unit against all considered impact
826
        assessment methods.
827
        * If method: Compare the method against all involved processes.
828

829
        Parameters
830
        ----------
831
        functional_unit : The reference flow to compare all considered impact categories against
832
        method : The method to compare all considered reference flows against
833
        aggregator : Used to aggregate EF contributions over certain columns
834
        limit : The number of top contributions to consider
835
        normalize : Determines whether or not to normalize the contribution values
836
        limit_type : The type of limit, either 'number' or 'percent'
837

838
        Returns
839
        -------
840
        Annotated top-contribution dataframe
841

842
        """
843
        contributions = self.get_contributions(
×
844
            self.EF, functional_unit, method, **kwargs
845
        )
846

847
        x_fields = self._contribution_rows(self.EF, aggregator)
×
848
        index, y_fields = self._contribution_index_cols(
×
849
            functional_unit=functional_unit, method=method
850
        )
851
        contributions, rev_index, mask = self.aggregate_by_parameters(
×
852
            contributions, self.BIOS, aggregator
853
        )
854

855
        # Normalise if required
856
        if normalize:
×
857
            contributions = self.normalize(contributions)
×
858

859
        top_cont_dict = self._build_dict(
×
860
            contributions, index, rev_index, limit, limit_type
861
        )
862
        labelled_df = self.get_labelled_contribution_dict(
×
863
            top_cont_dict, x_fields=x_fields, y_fields=y_fields, mask=mask
864
        )
865
        self.adjust_table_unit(labelled_df, method)
×
866
        return labelled_df
×
867

868
    def top_process_contributions(
1✔
869
        self,
870
        functional_unit: Optional[tuple] = None,
871
        method: Optional[tuple] = None,
872
        aggregator: Union[str, list, None] = None,
873
        limit: int = 5,
874
        normalize: bool = False,
875
        limit_type: str = "number",
876
        **kwargs,
877
    ) -> pd.DataFrame:
878
        """Return top process contributions for functional_unit or method.
879

880
        * If functional_unit: Compare the process against all considered impact
881
        assessment methods.
882
        * If method: Compare the method against all involved processes.
883

884
        Parameters
885
        ----------
886
        functional_unit : The reference flow to compare all considered impact categories against
887
        method : The method to compare all considered reference flows against
888
        aggregator : Used to aggregate EF contributions over certain columns
889
        limit : The number of top contributions to consider
890
        normalize : Determines whether or not to normalize the contribution values
891
        limit_type : The type of limit, either 'number' or 'percent'
892

893
        Returns
894
        -------
895
        Annotated top-contribution dataframe
896

897
        """
898
        contributions = self.get_contributions(
×
899
            self.ACT, functional_unit, method, **kwargs
900
        )
901

902
        x_fields = self._contribution_rows(self.ACT, aggregator)
×
903
        index, y_fields = self._contribution_index_cols(
×
904
            functional_unit=functional_unit, method=method
905
        )
906
        contributions, rev_index, mask = self.aggregate_by_parameters(
×
907
            contributions, self.TECH, aggregator
908
        )
909

910
        # Normalise if required
911
        if normalize:
×
912
            contributions = self.normalize(contributions)
×
913

914
        top_cont_dict = self._build_dict(
×
915
            contributions, index, rev_index, limit, limit_type
916
        )
917
        labelled_df = self.get_labelled_contribution_dict(
×
918
            top_cont_dict, x_fields=x_fields, y_fields=y_fields, mask=mask
919
        )
920
        self.adjust_table_unit(labelled_df, method)
×
921
        return labelled_df
×
922

923

924
def ids_to_keys(index_list):
1✔
925
    return [bd.get_activity(i).key if isinstance(i, int) else i for i in index_list]
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc