• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

LCA-ActivityBrowser / activity-browser / 12159423782

04 Dec 2024 12:09PM UTC coverage: 53.463% (-0.7%) from 54.185%
12159423782

Pull #1046

github

web-flow
Merge d11b5065f into 023eb5a8d
Pull Request #1046: First-Tier contribution analysis tab

17 of 255 new or added lines in 4 files covered. (6.67%)

3 existing lines in 3 files now uncovered.

8368 of 15652 relevant lines covered (53.46%)

0.53 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

20.98
/activity_browser/bwutils/multilca.py
1
from collections import OrderedDict
1✔
2
from copy import deepcopy
1✔
3
from typing import Iterable, Optional, Union
1✔
4
from logging import getLogger
1✔
5

6
import bw2analyzer as ba
1✔
7
import bw2calc as bc
1✔
8
import numpy as np
1✔
9
import pandas as pd
1✔
10
from PySide2.QtWidgets import QApplication, QMessageBox
1✔
11

12
from activity_browser.mod import bw2data as bd
1✔
13

14
from .commontasks import wrap_text
1✔
15
from .errors import ReferenceFlowValueError
1✔
16
from .metadata import AB_metadata
1✔
17

18
log = getLogger(__name__)
1✔
19
ca = ba.ContributionAnalysis()
1✔
20

21

22
class MLCA(object):
1✔
23
    """Wrapper class for performing LCA calculations with many reference flows and impact categories.
24

25
    Needs to be passed a brightway ``calculation_setup`` name.
26

27
    This class does not subclass the `LCA` class, and performs all
28
    calculations upon instantiation.
29

30
    Initialization creates `self.lca_scores`, which is a NumPy array
31
    of LCA scores, with rows of reference flows and columns of impact categories.
32
    Ordering is the same as in the `calculation_setup`.
33

34
    This class is adapted from `bw2calc.multi_lca.MultiLCA` and includes a
35
    number of additional attributes required to perform process- and
36
    elementary flow contribution analysis (see class `Contributions` below).
37

38
    Parameters
39
    ----------
40
    cs_name : str
41
        Name of the calculation setup
42

43
    Attributes
44
    ----------
45
    func_units_dict
46
    all_databases
47
    lca_scores_normalized
48
    func_units: list
49
        List of dictionaries, each containing the reference flow key and
50
        its required output
51
    fu_activity_keys: list
52
        The reference flow keys
53
    fu_index: dict
54
        Links the reference flows to a specific index
55
    rev_fu_index: dict
56
        Same as `fu_index` but using the indexes as keys
57
    methods: list
58
        The impact categories of the calculation setup
59
    method_index: dict
60
        Links the impact categories to a specific index
61
    rev_method_index: dict
62
        Same as `method_index` but using the indexes as keys
63
    lca: `bw2calc.lca.LCA`
64
        Brightway LCA instance used to perform LCA, LCI and LCIA
65
        calculations
66
    method_matrices: list
67
        Contains the characterization matrix for each impact category.
68
    lca_scores: `numpy.ndarray`
69
        2-dimensional array of shape (`func_units`, `methods`) holding the
70
        calculated LCA scores of each combination of reference flow and
71
        impact assessment method
72
    rev_activity_dict: dict
73
        See `bw2calc.lca.LCA.reverse_dict`
74
    rev_product_dict: dict
75
        See `bw2calc.lca.LCA.reverse_dict`
76
    rev_biosphere_dict: dict
77
        See `bw2calc.lca.LCA.reverse_dict`
78
    scaling_factors: dict
79
        Contains the life-cycle inventory scaling factors per reference flow
80
    technosphere_flows: dict
81
        Contains the calculated technosphere flows per reference flow
82
    inventory: dict
83
        Life cycle inventory (biosphere flows) per reference flow
84
    inventories: dict
85
        Biosphere flows per reference flow and impact category combination
86
    characterized_inventories: dict
87
        Inventory multiplied by scaling (relative impact on environment) per
88
        reference flow and impact category combination
89
    elementary_flow_contributions: `numpy.ndarray`
90
        3-dimensional array of shape (`func_units`, `methods`, `biosphere`)
91
        which holds the characterized inventory results summed along the
92
        technosphere axis
93
    process_contributions: `numpy.ndarray`
94
        3-dimensional array of shape (`func_units`, `methods`, `technosphere`)
95
        which holds the characterized inventory results summed along the
96
        biosphere axis
97
    func_unit_translation_dict: dict
98
        Contains the reference flow key and its expected output linked to
99
        the brightway activity label.
100
    func_key_dict: dict
101
        An index of the brightway activity labels
102
    func_key_list: list
103
        A derivative of `func_key_dict` containing just the keys
104

105
    Raises
106
    ------
107
    ValueError
108
        If the given `cs_name` cannot be found in brightway calculation_setups
109

110
    """
111

112
    def __init__(self, cs_name: str):
1✔
113
        try:
×
114
            cs = bd.calculation_setups[cs_name]
×
115
        except KeyError:
×
116
            raise ValueError(f"{cs_name} is not a known `calculation_setup`.")
×
117

118
        # check if all values are non-zero
119
        # cs['inv'] contains all reference flows (rf),
120
        # all values of rf are the individual reference flow items.
121
        if [v for rf in cs["inv"] for v in rf.values() if v == 0]:
×
122
            msg = QMessageBox()
×
123
            msg.setWindowTitle("Reference flows equal 0")
×
124
            msg.setText("All reference flows must be non-zero.")
×
125
            msg.setInformativeText(
×
126
                "Please enter a valid value before calculating LCA results again."
127
            )
128
            msg.setIcon(QMessageBox.Warning)
×
129
            QApplication.restoreOverrideCursor()
×
130
            msg.exec_()
×
131
            raise ReferenceFlowValueError("Reference flow == 0")
×
132

133
        # reference flows and related indexes
134
        self.func_units = cs["inv"]
×
135
        self.fu_activity_keys = [list(fu.keys())[0] for fu in self.func_units]
×
136
        self.fu_index = {k: i for i, k in enumerate(self.fu_activity_keys)}
×
137
        self.rev_fu_index = {v: k for k, v in self.fu_index.items()}
×
138

139
        # Methods and related indexes
140
        self.methods = cs["ia"]
×
141
        self.method_index = {m: i for i, m in enumerate(self.methods)}
×
142
        self.rev_method_index = {v: k for k, v in self.method_index.items()}
×
143

144
        # initial LCA and prepare method matrices
145
        self.lca = self._construct_lca()
×
146
        self.lca.lci(factorize=True)
×
147
        self.method_matrices = []
×
148
        for method in self.methods:
×
149
            self.lca.switch_method(method)
×
150
            self.method_matrices.append(self.lca.characterization_matrix)
×
151

152
        self.lca_scores = np.zeros((len(self.func_units), len(self.methods)))
×
153

154
        # data to be stored
155
        (self.rev_activity_dict, self.rev_product_dict, self.rev_biosphere_dict) = (
×
156
            self.lca.reverse_dict()
157
        )
158

159
        # Scaling
160
        self.scaling_factors = dict()
×
161

162
        # Technosphere product flows for a given reference flow
163
        self.technosphere_flows = dict()
×
164
        # Life cycle inventory (biosphere flows) by reference flow
165
        self.inventory = dict()
×
166
        # Inventory (biosphere flows) for specific reference flow (e.g. 2000x15000) and impact category.
167
        self.inventories = dict()
×
168
        # Inventory multiplied by scaling (relative impact on environment) per impact category.
169
        self.characterized_inventories = dict()
×
170

171
        # Summarized contributions for EF and processes.
172
        self.elementary_flow_contributions = np.zeros(
×
173
            (
174
                len(self.func_units),
175
                len(self.methods),
176
                self.lca.biosphere_matrix.shape[0],
177
            )
178
        )
179
        self.process_contributions = np.zeros(
×
180
            (
181
                len(self.func_units),
182
                len(self.methods),
183
                self.lca.technosphere_matrix.shape[0],
184
            )
185
        )
186

187
        self.func_unit_translation_dict = {}
×
188
        for fu in self.func_units:
×
189
            key = next(iter(fu))
×
190
            amount = fu[key]
×
191
            act = bd.get_activity(key)
×
192
            self.func_unit_translation_dict[
×
193
                (
194
                    f'{act["name"]} | '
195
                    f'{act["reference product"]} | '
196
                    f'{act["location"]} | '
197
                    f'{act["database"]} | '
198
                    f"{amount}"
199
                )
200
            ] = fu
201
        self.func_key_dict = {
×
202
            m: i for i, m in enumerate(self.func_unit_translation_dict.keys())
203
        }
204
        self.func_key_list = list(self.func_unit_translation_dict.keys())
×
205

206
    def _construct_lca(self):
1✔
207
        return bc.LCA(demand=self.func_units_dict, method=self.methods[0])
×
208

209
    def _perform_calculations(self):
1✔
210
        """Isolates the code which performs calculations to allow subclasses
211
        to either alter the code or redo calculations after matrix substitution.
212
        """
213
        for row, func_unit in enumerate(self.func_units):
×
214
            # Do the LCA for the current reference flow
215
            try:
×
216
                self.lca.redo_lci(func_unit)
×
217
            except:
×
218
                # bw25 compatibility
219
                key = list(func_unit.keys())[0]
×
220
                self.lca.redo_lci({bd.get_activity(key).id: func_unit[key]})
×
221

222
            # Now update the:
223
            # - Scaling factors
224
            # - Technosphere flows
225
            # - Life cycle inventory
226
            # - Life-cycle inventory (disaggregated by contributing process)
227
            # for current reference flow
228
            self.scaling_factors.update({str(func_unit): self.lca.supply_array})
×
229
            self.technosphere_flows.update(
×
230
                {
231
                    str(func_unit): np.multiply(
232
                        self.lca.supply_array, self.lca.technosphere_matrix.diagonal()
233
                    )
234
                }
235
            )
236
            self.inventory.update(
×
237
                {str(func_unit): np.array(self.lca.inventory.sum(axis=1)).ravel()}
238
            )
239
            self.inventories.update({str(func_unit): self.lca.inventory})
×
240

241
            # Now, for each method, take the current reference flow and do inventory analysis
242
            for col, cf_matrix in enumerate(self.method_matrices):
×
243
                self.lca.characterization_matrix = cf_matrix
×
244
                self.lca.lcia_calculation()
×
245
                self.lca_scores[row, col] = self.lca.score
×
246
                self.characterized_inventories[row, col] = (
×
247
                    self.lca.characterized_inventory.copy()
248
                )
249
                self.elementary_flow_contributions[row, col] = np.array(
×
250
                    self.lca.characterized_inventory.sum(axis=1)
251
                ).ravel()
252
                self.process_contributions[row, col] = (
×
253
                    self.lca.characterized_inventory.sum(axis=0)
254
                )
255

256
    def calculate(self):
1✔
257
        self._perform_calculations()
×
258

259
    @property
1✔
260
    def func_units_dict(self) -> dict:
1✔
261
        """Return a dictionary of reference flow (key, demand)."""
262
        return {key: 1 for func_unit in self.func_units for key in func_unit}
×
263

264
    @property
1✔
265
    def all_databases(self) -> set:
1✔
266
        """Get all databases linked to the reference flows."""
267

268
        def get_dependents(dbs: set, dependents: list) -> set:
×
269
            for dep in (bd.databases[db].get("depends", []) for db in dependents):
×
270
                if not dbs.issuperset(dep):
×
271
                    dbs = get_dependents(dbs.union(dep), dep)
×
272
            return dbs
×
273

274
        dbs = set(f[0] for f in self.fu_activity_keys)
×
275
        dbs = get_dependents(dbs, list(dbs))
×
276
        # In rare cases, the default biosphere is not found as a dependency, see:
277
        # https://github.com/LCA-ActivityBrowser/activity-browser/issues/298
278
        # Always include it.
279
        dbs.add(bd.config.biosphere)
×
280
        return dbs
×
281

282
    def get_results_for_method(self, index: int = 0) -> pd.DataFrame:
1✔
283
        data = self.lca_scores[:, index]
×
284
        return pd.DataFrame(data, index=self.fu_activity_keys)
×
285

286
    @property
1✔
287
    def lca_scores_normalized(self) -> np.ndarray:
1✔
288
        """Normalize LCA scores by impact assessment method."""
289
        return self.lca_scores / self.lca_scores.max(axis=0)
×
290

291
    def get_normalized_scores_df(self) -> pd.DataFrame:
1✔
292
        """To be used for the currently inactive CorrelationPlot."""
293
        labels = [str(x + 1) for x in range(len(self.func_units))]
×
294
        return pd.DataFrame(data=self.lca_scores_normalized.T, columns=labels)
×
295

296
    def lca_scores_to_dataframe(self) -> pd.DataFrame:
1✔
297
        """Returns a dataframe of LCA scores using FU labels as index and
298
        methods as columns.
299
        """
300
        return pd.DataFrame(
×
301
            data=self.lca_scores,
302
            index=pd.Index(self.fu_activity_keys),
303
            columns=pd.Index(self.methods),
304
        )
305

306
    def get_all_metadata(self) -> None:
1✔
307
        """Populate AB_metadata with relevant database values.
308

309
        Set metadata in form of a Pandas DataFrame for biosphere and
310
        technosphere databases for tables and additional aggregation.
311
        """
312
        AB_metadata.add_metadata(self.all_databases)
×
313

314

315
class Contributions(object):
1✔
316
    """Contribution Analysis built on top of the Multi-LCA class.
317

318
    This class requires instantiated MLCA and MetaDataStore objects.
319

320
    Parameters
321
    ----------
322
    mlca : `MLCA`
323
        An instantiated MLCA object
324

325
    Attributes
326
    ----------
327
    DEFAULT_ACT_FIELDS : list
328
        Default activity/reference flow column names
329
    DEFAULT_EF_FIELDS : list
330
        Default environmental flow column names
331
    mlca: `MLCA`
332
        Linked `MLCA` instance used for contribution calculations
333
    act_fields: list
334
        technosphere-specific metadata column names
335
    ef_fields: list
336
        biosphere-specific metadata column names
337

338
    Raises
339
    ------
340
    ValueError
341
        If the given `mlca` object is not an instance of `MLCA`
342

343
    """
344

345
    ACT = "process"
1✔
346
    EF = "elementary_flow"
1✔
347
    TECH = "technosphere"
1✔
348
    BIOS = "biosphere"
1✔
349

350
    DEFAULT_ACT_FIELDS = ["reference product", "name", "location", "unit", "database"]
1✔
351
    DEFAULT_EF_FIELDS = ["name", "categories", "type", "unit", "database"]
1✔
352

353
    DEFAULT_ACT_AGGREGATES = ["none"] + DEFAULT_ACT_FIELDS
1✔
354
    DEFAULT_EF_AGGREGATES = ["none"] + DEFAULT_EF_FIELDS
1✔
355

356
    def __init__(self, mlca):
1✔
357
        if not isinstance(mlca, MLCA):
×
358
            raise ValueError("Must pass an MLCA object. Passed:", type(mlca))
×
359
        self.mlca = mlca
×
360
        # Ensure MetaDataStore is updated.
361
        self.mlca.get_all_metadata()
×
362

363
        # Set default metadata keys (those not in the dataframe will be eliminated)
364
        self.act_fields = AB_metadata.get_existing_fields(self.DEFAULT_ACT_FIELDS)
×
365
        self.ef_fields = AB_metadata.get_existing_fields(self.DEFAULT_EF_FIELDS)
×
366

367
        # Specific datastructures for retrieving relevant MLCA data
368
        # inventory: inventory, reverse index, metadata keys, metadata fields
369
        self.inventory_data = {
×
370
            "biosphere": (
371
                self.mlca.inventory,
372
                self.mlca.rev_biosphere_dict,
373
                self.mlca.fu_activity_keys,
374
                self.ef_fields,
375
            ),
376
            "technosphere": (
377
                self.mlca.technosphere_flows,
378
                self.mlca.rev_activity_dict,
379
                self.mlca.fu_activity_keys,
380
                self.act_fields,
381
            ),
382
        }
383
        # aggregation: reverse index, metadata keys, metadata fields
384
        self.aggregate_data = {
×
385
            "biosphere": (
386
                self.mlca.rev_biosphere_dict,
387
                self.mlca.lca.biosphere_dict,
388
                self.ef_fields,
389
            ),
390
            "technosphere": (
391
                self.mlca.rev_activity_dict,
392
                self.mlca.lca.activity_dict,
393
                self.act_fields,
394
            ),
395
        }
396

397
    def normalize(self, contribution_array: np.ndarray) -> np.ndarray:
1✔
398
        """Normalise the contribution array.
399

400
        Parameters
401
        ----------
402
        contribution_array : A 2-dimensional contribution array
403

404
        Returns
405
        -------
406
        2-dimensional array of same shape, with scores normalized.
407

408
        """
409
        scores = abs(contribution_array.sum(axis=1, keepdims=True))
×
410
        return contribution_array / scores
×
411

412
    def _build_dict(
1✔
413
        self,
414
        contributions: np.ndarray,
415
        FU_M_index: dict,
416
        rev_dict: dict,
417
        limit: int,
418
        limit_type: str,
419
    ) -> dict:
420
        """Sort the given contribution array on method or reference flow column.
421

422
        Parameters
423
        ----------
424
        contributions: A 2-dimensional contribution array
425
        FU_M_index : Dictionary which maps the reference flows or methods to their matching columns
426
        rev_dict : 'reverse' dictionary used to map correct activity/method to its value
427
        limit : Number of top-contributing items to include
428
        limit_type : Either "number" or "percent", ContributionAnalysis.sort_array for complete explanation
429

430
        Returns
431
        -------
432
        Top-contributing flows per method or activity
433

434
        """
435
        topcontribution_dict = dict()
×
436
        for fu_or_method, col in FU_M_index.items():
×
437

NEW
438
            contribution_col = contributions[col, :]
×
NEW
439
            total = contribution_col.sum()
×
440

UNCOV
441
            top_contribution = ca.sort_array(
×
442
                contribution_col, limit=limit, limit_type=limit_type, total=total
443
            )
444

445
            # split and calculate remaining rest sections for positive and negative part
NEW
446
            pos_rest = (
×
447
                np.sum(contribution_col[contribution_col > 0])
448
                - np.sum(top_contribution[top_contribution[:, 0] > 0][:, 0])
449
            )
NEW
450
            neg_rest = (
×
451
                    np.sum(contribution_col[contribution_col < 0])
452
                    - np.sum(top_contribution[top_contribution[:, 0] < 0][:, 0])
453
            )
454

455
            cont_per = OrderedDict()
×
456
            cont_per.update(
×
457
                {
458
                    ("Total", ""): total,
459
                    ("Rest (+)", ""): pos_rest,
460
                    ("Rest (-)", ""): neg_rest,
461
                }
462
            )
463
            for value, index in top_contribution:
×
464
                cont_per.update({rev_dict[index]: value})
×
465
            topcontribution_dict.update({fu_or_method: cont_per})
×
466
        return topcontribution_dict
×
467

468
    @staticmethod
1✔
469
    def get_labels(
1✔
470
        key_list: pd.MultiIndex,
471
        fields: Optional[list] = None,
472
        separator: str = " | ",
473
        max_length: int = False,
474
        mask: Optional[list] = None,
475
    ) -> list:
476
        """Generate labels from metadata information.
477

478
        Setting max_length will wrap the label into a multi-line string if
479
        size is larger than max_length.
480

481
        Parameters
482
        ----------
483
        key_list : An index containing 'keys' to be retrieved from the MetaDataStore
484
        fields : List of column-names to be included from the MetaDataStore
485
        separator : Specific separator to use when joining strings together
486
        max_length : Allowed character length before string is wrapped over multiple lines
487
        mask : Instead of the metadata, this list is used to check keys against.
488
            Use if data is aggregated or keys do not exist in MetaDataStore
489

490
        Returns
491
        -------
492
        Translated and/or joined (and wrapped) labels matching the keys
493

494
        """
495
        fields = (
×
496
            fields if fields else ["name", "reference product", "location", "database"]
497
        )
498
        keys = (
×
499
            k for k in key_list
500
        )  # need to do this as the keys come from a pd.Multiindex
501
        translated_keys = []
×
502
        for k in keys:
×
503
            if mask and k in mask:
×
504
                translated_keys.append(k)
×
505
            elif isinstance(k, str):
×
506
                translated_keys.append(k)
×
507
            elif k in AB_metadata.index:
×
508
                translated_keys.append(
×
509
                    separator.join(
510
                        [str(l) for l in list(AB_metadata.get_metadata(k, fields))]
511
                    )
512
                )
513
            else:
514
                translated_keys.append(separator.join([i for i in k if i != ""]))
×
515
        if max_length:
×
516
            translated_keys = [
×
517
                wrap_text(k, max_length=max_length) for k in translated_keys
518
            ]
519
        return translated_keys
×
520

521
    @classmethod
1✔
522
    def join_df_with_metadata(
1✔
523
        cls,
524
        df: pd.DataFrame,
525
        x_fields: Optional[list] = None,
526
        y_fields: Optional[list] = None,
527
        special_keys: Optional[list] = None,
528
    ) -> pd.DataFrame:
529
        """Join a dataframe that has keys on the index with metadata.
530

531
        Metadata fields are defined in x_fields.
532
        If columns are also keys (and not, e.g. method names), they can also
533
        be replaced with metadata, if y_fields are provided.
534

535
        Parameters
536
        ----------
537
        df : Simple DataFrame containing processed data
538
        x_fields : List of additional columns to add from the MetaDataStore
539
        y_fields : List of column keys for the data in the df dataframe
540
        special_keys : List of specific items to place at the top of the dataframe
541

542
        Returns
543
        -------
544
        Expanded and metadata-annotated dataframe
545

546
        """
547

548
        # replace column keys with labels
549
        df.columns = cls.get_labels(df.columns, fields=y_fields)
×
550
        # Coerce index to MultiIndex if it currently isn't
551
        if not isinstance(df.index, pd.MultiIndex):
×
552
            df.index = pd.MultiIndex.from_tuples(ids_to_keys(df.index))
×
553

554
        # get metadata for rows
555
        keys = [k for k in df.index if k in AB_metadata.index]
×
556
        metadata = AB_metadata.get_metadata(keys, x_fields)
×
557

558
        # join data with metadata
559
        joined = metadata.join(df, how="outer")
×
560

561
        if special_keys:
×
562
            # replace index keys with labels
NEW
563
            try:  # first put Total, Rest (+) and Rest (-) to the first three positions in the dataframe
×
564
                complete_index = special_keys + keys
×
565
                joined = joined.reindex(complete_index, axis="index", fill_value=0.0)
×
566
            except:
×
567
                log.error(
×
568
                    "Could not put 'Total', 'Rest (+)' and 'Rest (-)' on positions 0, 1 and 2 in the dataframe."
569
                )
570
        joined.index = cls.get_labels(joined.index, fields=x_fields)
×
571
        return joined
×
572

573
    def get_labelled_contribution_dict(
1✔
574
        self,
575
        cont_dict: dict,
576
        x_fields: list = None,
577
        y_fields: list = None,
578
        mask: list = None,
579
    ) -> pd.DataFrame:
580
        """Annotate the contribution dict with metadata.
581

582
        Parameters
583
        ----------
584
        cont_dict : Holds the contribution data connected to the functions of methods
585
        x_fields : X-axis fieldnames, these are usually the indexes/keys of specific processes
586
        y_fields : Column names specific to the cont_dict to be labelled
587
        mask : Used in case of aggregation or special cases where the usual way of using the metadata cannot be used
588

589
        Returns
590
        -------
591
        Annotated contribution dict inside a pandas dataframe
592

593
        """
594
        dfs = (
×
595
            pd.DataFrame(v.values(), index=list(v.keys()), columns=[k])
596
            for k, v in cont_dict.items()
597
        )
598
        df = pd.concat(dfs, sort=False, axis=1)
×
599
        # If the cont_dict has tuples for keys, coerce df.columns into MultiIndex
600
        if all(isinstance(k, tuple) for k in cont_dict.keys()):
×
601
            df.columns = pd.MultiIndex.from_tuples(df.columns)
×
NEW
602
        special_keys = [("Total", ""), ("Rest (+)", ""), ("Rest (-)", "")]
×
603
        # replace all 0 values with NaN and drop all rows with only NaNs
NEW
604
        df = df.replace(0, np.nan)
×
605

606
        # sort on absolute mean of a row
NEW
607
        df_bot = deepcopy(df.loc[df.index.difference(special_keys)].dropna(how="all"))
×
608

NEW
609
        func = lambda row: np.nanmean(np.abs(row))
×
NEW
610
        if len(df_bot) > 1:  # but only sort if there is something to sort
×
NEW
611
            df_bot["_sort_me_"] = (df_bot.select_dtypes(include=np.number)).apply(func, axis=1)
×
NEW
612
            df_bot.sort_values(by="_sort_me_", ascending=False, inplace=True)
×
NEW
613
            del df_bot["_sort_me_"]
×
614

NEW
615
        df = pd.concat([df.iloc[:3, :], df_bot], axis=0)
×
616

617
        if not mask:
×
618
            joined = self.join_df_with_metadata(
×
619
                df, x_fields=x_fields, y_fields=y_fields, special_keys=special_keys
620
            )
621
        else:
622
            df.columns = self.get_labels(df.columns, fields=y_fields)
×
623
            keys = [k for k in df.index if k in mask]
×
624
            combined_keys = special_keys + keys
×
625
            # Reindex the combined_keys to ensure they always exist in the dataframe,
626
            # this avoids keys with 0 values not existing due to the 'dropna' action above.
627
            df = df.reindex(combined_keys, axis="index", fill_value=0.0)
×
628
            df.index = self.get_labels(df.index, mask=mask)
×
629
            joined = df
×
630
        if joined is not None:
×
631
            return joined.reset_index(drop=False)
×
632

633
    @staticmethod
1✔
634
    def adjust_table_unit(df: pd.DataFrame, method: Optional[tuple]) -> pd.DataFrame:
1✔
635
        """Given a dataframe, adjust the unit of the table to either match the given method, or not exist."""
636
        if "unit" not in df.columns:
×
637
            return df
×
NEW
638
        keys = df.index[~df["index"].isin({"Total", "Rest (+)", "Rest (-)"})]
×
639
        unit = bd.Method(method).metadata.get("unit") if method else "unit"
×
640
        df.loc[keys, "unit"] = unit
×
641
        return df
×
642

643
    @staticmethod
1✔
644
    def _build_inventory(
1✔
645
        inventory: dict, indices: dict, columns: list, fields: list
646
    ) -> pd.DataFrame:
647
        df = pd.DataFrame(inventory)
×
648
        df.index = pd.MultiIndex.from_tuples(ids_to_keys(indices.values()))
×
649
        df.columns = Contributions.get_labels(columns, max_length=30)
×
650
        metadata = AB_metadata.get_metadata(list(ids_to_keys(indices.values())), fields)
×
651
        joined = metadata.join(df)
×
652
        joined.reset_index(inplace=True, drop=True)
×
653
        return joined
×
654

655
    def inventory_df(
1✔
656
        self, inventory_type: str, columns: set = {"name", "database", "code"}
657
    ) -> pd.DataFrame:
658
        """Return an inventory dataframe with metadata of the given type."""
659
        try:
×
660
            data = self.inventory_data[inventory_type]
×
661
            appending = columns.difference(set(data[3]))
×
662
            for clmn in appending:
×
663
                data[3].append(clmn)
×
664
        except KeyError:
×
665
            raise ValueError(
×
666
                "Type must be either 'biosphere' or 'technosphere', "
667
                "'{}' given.".format(inventory_type)
668
            )
669
        return self._build_inventory(*data)
×
670

671
    def _build_lca_scores_df(self, scores: np.ndarray) -> pd.DataFrame:
1✔
672
        df = pd.DataFrame(
×
673
            scores,
674
            index=pd.MultiIndex.from_tuples(self.mlca.fu_activity_keys),
675
            columns=self.mlca.methods,
676
        )
677
        # Add amounts column.
678
        df["amount"] = [next(iter(fu.values()), 1.0) for fu in self.mlca.func_units]
×
679
        joined = Contributions.join_df_with_metadata(
×
680
            df, x_fields=self.act_fields, y_fields=None
681
        )
682
        # Precisely order the columns that are shown in the LCA Results overview
683
        # tab: “X kg of product Y from activity Z in location L, and database D”
684
        col_order = pd.Index(
×
685
            [
686
                "amount",
687
                "unit",
688
                "reference product",
689
                "name",
690
                "location",
691
                "database",
692
            ]
693
        )
694
        methods = joined.columns.difference(col_order, sort=False)
×
695
        joined = joined.loc[:, col_order.append(methods)]
×
696
        return joined.reset_index(drop=False)
×
697

698
    def lca_scores_df(self, normalized: bool = False) -> pd.DataFrame:
1✔
699
        """Return a metadata-annotated DataFrame of the LCA scores."""
700
        scores = (
×
701
            self.mlca.lca_scores if not normalized else self.mlca.lca_scores_normalized
702
        )
703
        return self._build_lca_scores_df(scores)
×
704

705
    @staticmethod
1✔
706
    def _build_contributions(data: np.ndarray, index: int, axis: int) -> np.ndarray:
1✔
707
        return data.take(index, axis=axis)
×
708

709
    def get_contributions(
1✔
710
        self, contribution, functional_unit=None, method=None, **kwargs
711
    ) -> np.ndarray:
712
        """Return a contribution matrix given the type and fu / method."""
713
        if all([functional_unit, method]) or not any([functional_unit, method]):
×
714
            raise ValueError(
×
715
                "It must be either by reference flow or by impact category. Provided:"
716
                "\n Reference flow: {} \n Impact Category: {}".format(
717
                    functional_unit, method
718
                )
719
            )
720
        dataset = {
×
721
            "process": self.mlca.process_contributions,
722
            "elementary_flow": self.mlca.elementary_flow_contributions,
723
        }
724
        if method:
×
725
            return self._build_contributions(
×
726
                dataset[contribution], self.mlca.method_index[method], 1
727
            )
728
        elif functional_unit:
×
729
            return self._build_contributions(
×
730
                dataset[contribution], self.mlca.func_key_dict[functional_unit], 0
731
            )
732

733
    def aggregate_by_parameters(
1✔
734
        self,
735
        contributions: np.ndarray,
736
        inventory: str,
737
        parameters: Union[str, list] = None,
738
    ):
739
        """Perform aggregation of the contribution data given parameters.
740

741
        Parameters
742
        ----------
743
        contributions : 2-dimensional contribution array
744
        inventory : Either 'biosphere' or 'technosphere', used to determine which inventory to use
745
        parameters : One or more parameters by which to aggregate the given contribution array.
746

747
        Returns
748
        -------
749
        aggregated : pd.DataFrame
750
            The aggregated 2-dimensional contribution array
751
        mask_index : dict
752
            Contains all of the values of the aggregation mask, linked to their indexes
753
        mask : list or dictview or None
754
            An optional list or dictview of the mask_index values
755

756
        """
757
        rev_index, keys, fields = self.aggregate_data[inventory]
×
758
        if not parameters:
×
759
            return contributions, rev_index, None
×
760

761
        df = pd.DataFrame(contributions).T
×
762
        columns = list(range(contributions.shape[0]))
×
763
        df.index = pd.MultiIndex.from_tuples(rev_index.values())
×
764
        metadata = AB_metadata.get_metadata(list(keys), fields)
×
765

766
        joined = metadata.join(df)
×
767
        joined.reset_index(inplace=True, drop=True)
×
768
        grouped = joined.groupby(parameters)
×
769
        aggregated = grouped[columns].sum()
×
770
        mask_index = {i: m for i, m in enumerate(aggregated.index)}
×
771

772
        return aggregated.T.values, mask_index, mask_index.values()
×
773

774
    def _contribution_rows(self, contribution: str, aggregator=None):
1✔
775
        if aggregator is None:
×
776
            return self.act_fields if contribution == self.ACT else self.ef_fields
×
777
        return aggregator if isinstance(aggregator, list) else [aggregator]
×
778

779
    def _correct_method_index(self, mthd_indx: list) -> dict:
1✔
780
        """A method for amending the tuples for impact method labels so
781
        that all tuples are fully printed.
782

783
        NOTE THE AMENDED TUPLES ARE COPIED, THIS SHOULD NOT BE USED TO
784
        ASSIGN OR MODIFY THE UNDERLYING DATA STRUCTURES!
785

786
        mthd_indx: a list of tuples for the impact method names
787
        """
788
        method_tuple_length = max([len(k) for k in mthd_indx])
×
789
        conv_dict = dict()
×
790
        for v, mthd in enumerate(mthd_indx):
×
791
            if len(mthd) < method_tuple_length:
×
792
                _l = list(mthd)
×
793
                for i in range(len(mthd), method_tuple_length):
×
794
                    _l.append("")
×
795
                mthd = tuple(_l)
×
796
            conv_dict[mthd] = v
×
797
        return conv_dict
×
798

799
    def _contribution_index_cols(self, **kwargs) -> (dict, Optional[Iterable]):
1✔
800
        if kwargs.get("method") is not None:
×
801
            return self.mlca.fu_index, self.act_fields
×
802
        return self._correct_method_index(self.mlca.methods), None
×
803

804
    def top_elementary_flow_contributions(
1✔
805
        self,
806
        functional_unit: Optional[tuple] = None,
807
        method: Optional[tuple] = None,
808
        aggregator: Union[str, list, None] = None,
809
        limit: int = 5,
810
        normalize: bool = False,
811
        limit_type: str = "number",
812
        **kwargs,
813
    ) -> pd.DataFrame:
814
        """Return top EF contributions for either functional_unit or method.
815

816
        * If functional_unit: Compare the unit against all considered impact
817
        assessment methods.
818
        * If method: Compare the method against all involved processes.
819

820
        Parameters
821
        ----------
822
        functional_unit : The reference flow to compare all considered impact categories against
823
        method : The method to compare all considered reference flows against
824
        aggregator : Used to aggregate EF contributions over certain columns
825
        limit : The number of top contributions to consider
826
        normalize : Determines whether or not to normalize the contribution values
827
        limit_type : The type of limit, either 'number' or 'percent'
828

829
        Returns
830
        -------
831
        Annotated top-contribution dataframe
832

833
        """
834
        contributions = self.get_contributions(
×
835
            self.EF, functional_unit, method, **kwargs
836
        )
837

838
        x_fields = self._contribution_rows(self.EF, aggregator)
×
839
        index, y_fields = self._contribution_index_cols(
×
840
            functional_unit=functional_unit, method=method
841
        )
842
        contributions, rev_index, mask = self.aggregate_by_parameters(
×
843
            contributions, self.BIOS, aggregator
844
        )
845

846
        # Normalise if required
847
        if normalize:
×
848
            contributions = self.normalize(contributions)
×
849

850
        top_cont_dict = self._build_dict(
×
851
            contributions, index, rev_index, limit, limit_type
852
        )
853
        labelled_df = self.get_labelled_contribution_dict(
×
854
            top_cont_dict, x_fields=x_fields, y_fields=y_fields, mask=mask
855
        )
856
        self.adjust_table_unit(labelled_df, method)
×
857
        return labelled_df
×
858

859
    def top_process_contributions(
1✔
860
        self,
861
        functional_unit: Optional[tuple] = None,
862
        method: Optional[tuple] = None,
863
        aggregator: Union[str, list, None] = None,
864
        limit: int = 5,
865
        normalize: bool = False,
866
        limit_type: str = "number",
867
        **kwargs,
868
    ) -> pd.DataFrame:
869
        """Return top process contributions for functional_unit or method.
870

871
        * If functional_unit: Compare the process against all considered impact
872
        assessment methods.
873
        * If method: Compare the method against all involved processes.
874

875
        Parameters
876
        ----------
877
        functional_unit : The reference flow to compare all considered impact categories against
878
        method : The method to compare all considered reference flows against
879
        aggregator : Used to aggregate EF contributions over certain columns
880
        limit : The number of top contributions to consider
881
        normalize : Determines whether or not to normalize the contribution values
882
        limit_type : The type of limit, either 'number' or 'percent'
883

884
        Returns
885
        -------
886
        Annotated top-contribution dataframe
887

888
        """
889
        contributions = self.get_contributions(
×
890
            self.ACT, functional_unit, method, **kwargs
891
        )
892

893
        x_fields = self._contribution_rows(self.ACT, aggregator)
×
894
        index, y_fields = self._contribution_index_cols(
×
895
            functional_unit=functional_unit, method=method
896
        )
897
        contributions, rev_index, mask = self.aggregate_by_parameters(
×
898
            contributions, self.TECH, aggregator
899
        )
900

901
        # Normalise if required
902
        if normalize:
×
903
            contributions = self.normalize(contributions)
×
904

905
        top_cont_dict = self._build_dict(
×
906
            contributions, index, rev_index, limit, limit_type
907
        )
908
        labelled_df = self.get_labelled_contribution_dict(
×
909
            top_cont_dict, x_fields=x_fields, y_fields=y_fields, mask=mask
910
        )
911
        self.adjust_table_unit(labelled_df, method)
×
912
        return labelled_df
×
913

914

915
def ids_to_keys(index_list):
1✔
916
    return [bd.get_activity(i).key if isinstance(i, int) else i for i in index_list]
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc