• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

LCA-ActivityBrowser / activity-browser / 12235954285

09 Dec 2024 12:47PM UTC coverage: 53.283% (-0.9%) from 54.185%
12235954285

Pull #1046

github

web-flow
Merge 2681204bd into 023eb5a8d
Pull Request #1046: First-Tier contribution analysis tab

22 of 310 new or added lines in 4 files covered. (7.1%)

10 existing lines in 5 files now uncovered.

8367 of 15703 relevant lines covered (53.28%)

0.53 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

20.85
/activity_browser/bwutils/multilca.py
1
from collections import OrderedDict
1✔
2
from copy import deepcopy
1✔
3
from typing import Iterable, Optional, Union
1✔
4
from logging import getLogger
1✔
5

6
import bw2analyzer as ba
1✔
7
import bw2calc as bc
1✔
8
import numpy as np
1✔
9
import pandas as pd
1✔
10
from PySide2.QtWidgets import QApplication, QMessageBox
1✔
11

12
from activity_browser.mod import bw2data as bd
1✔
13

14
from .commontasks import wrap_text
1✔
15
from .errors import ReferenceFlowValueError
1✔
16
from .metadata import AB_metadata
1✔
17

18
log = getLogger(__name__)
1✔
19
ca = ba.ContributionAnalysis()
1✔
20

21

22
class MLCA(object):
1✔
23
    """Wrapper class for performing LCA calculations with many reference flows and impact categories.
24

25
    Needs to be passed a brightway ``calculation_setup`` name.
26

27
    This class does not subclass the `LCA` class, and performs all
28
    calculations upon instantiation.
29

30
    Initialization creates `self.lca_scores`, which is a NumPy array
31
    of LCA scores, with rows of reference flows and columns of impact categories.
32
    Ordering is the same as in the `calculation_setup`.
33

34
    This class is adapted from `bw2calc.multi_lca.MultiLCA` and includes a
35
    number of additional attributes required to perform process- and
36
    elementary flow contribution analysis (see class `Contributions` below).
37

38
    Parameters
39
    ----------
40
    cs_name : str
41
        Name of the calculation setup
42

43
    Attributes
44
    ----------
45
    func_units_dict
46
    all_databases
47
    lca_scores_normalized
48
    func_units: list
49
        List of dictionaries, each containing the reference flow key and
50
        its required output
51
    fu_activity_keys: list
52
        The reference flow keys
53
    fu_index: dict
54
        Links the reference flows to a specific index
55
    rev_fu_index: dict
56
        Same as `fu_index` but using the indexes as keys
57
    methods: list
58
        The impact categories of the calculation setup
59
    method_index: dict
60
        Links the impact categories to a specific index
61
    rev_method_index: dict
62
        Same as `method_index` but using the indexes as keys
63
    lca: `bw2calc.lca.LCA`
64
        Brightway LCA instance used to perform LCA, LCI and LCIA
65
        calculations
66
    method_matrices: list
67
        Contains the characterization matrix for each impact category.
68
    lca_scores: `numpy.ndarray`
69
        2-dimensional array of shape (`func_units`, `methods`) holding the
70
        calculated LCA scores of each combination of reference flow and
71
        impact assessment method
72
    rev_activity_dict: dict
73
        See `bw2calc.lca.LCA.reverse_dict`
74
    rev_product_dict: dict
75
        See `bw2calc.lca.LCA.reverse_dict`
76
    rev_biosphere_dict: dict
77
        See `bw2calc.lca.LCA.reverse_dict`
78
    scaling_factors: dict
79
        Contains the life-cycle inventory scaling factors per reference flow
80
    technosphere_flows: dict
81
        Contains the calculated technosphere flows per reference flow
82
    inventory: dict
83
        Life cycle inventory (biosphere flows) per reference flow
84
    inventories: dict
85
        Biosphere flows per reference flow and impact category combination
86
    characterized_inventories: dict
87
        Inventory multiplied by scaling (relative impact on environment) per
88
        reference flow and impact category combination
89
    elementary_flow_contributions: `numpy.ndarray`
90
        3-dimensional array of shape (`func_units`, `methods`, `biosphere`)
91
        which holds the characterized inventory results summed along the
92
        technosphere axis
93
    process_contributions: `numpy.ndarray`
94
        3-dimensional array of shape (`func_units`, `methods`, `technosphere`)
95
        which holds the characterized inventory results summed along the
96
        biosphere axis
97
    func_unit_translation_dict: dict
98
        Contains the reference flow key and its expected output linked to
99
        the brightway activity label.
100
    func_key_dict: dict
101
        An index of the brightway activity labels
102
    func_key_list: list
103
        A derivative of `func_key_dict` containing just the keys
104

105
    Raises
106
    ------
107
    ValueError
108
        If the given `cs_name` cannot be found in brightway calculation_setups
109

110
    """
111

112
    def __init__(self, cs_name: str):
1✔
113
        try:
×
114
            cs = bd.calculation_setups[cs_name]
×
115
        except KeyError:
×
116
            raise ValueError(f"{cs_name} is not a known `calculation_setup`.")
×
117

118
        # check if all values are non-zero
119
        # cs['inv'] contains all reference flows (rf),
120
        # all values of rf are the individual reference flow items.
121
        if [v for rf in cs["inv"] for v in rf.values() if v == 0]:
×
122
            msg = QMessageBox()
×
123
            msg.setWindowTitle("Reference flows equal 0")
×
124
            msg.setText("All reference flows must be non-zero.")
×
125
            msg.setInformativeText(
×
126
                "Please enter a valid value before calculating LCA results again."
127
            )
128
            msg.setIcon(QMessageBox.Warning)
×
129
            QApplication.restoreOverrideCursor()
×
130
            msg.exec_()
×
131
            raise ReferenceFlowValueError("Reference flow == 0")
×
132

133
        # reference flows and related indexes
134
        self.func_units = cs["inv"]
×
135
        self.fu_activity_keys = [list(fu.keys())[0] for fu in self.func_units]
×
136
        self.fu_index = {k: i for i, k in enumerate(self.fu_activity_keys)}
×
137
        self.rev_fu_index = {v: k for k, v in self.fu_index.items()}
×
138

139
        # Methods and related indexes
140
        self.methods = cs["ia"]
×
141
        self.method_index = {m: i for i, m in enumerate(self.methods)}
×
142
        self.rev_method_index = {v: k for k, v in self.method_index.items()}
×
143

144
        # initial LCA and prepare method matrices
145
        self.lca = self._construct_lca()
×
146
        self.lca.lci(factorize=True)
×
147
        self.method_matrices = []
×
148
        for method in self.methods:
×
149
            self.lca.switch_method(method)
×
150
            self.method_matrices.append(self.lca.characterization_matrix)
×
151

152
        self.lca_scores = np.zeros((len(self.func_units), len(self.methods)))
×
153

154
        # data to be stored
155
        (self.rev_activity_dict, self.rev_product_dict, self.rev_biosphere_dict) = (
×
156
            self.lca.reverse_dict()
157
        )
158

159
        # Scaling
160
        self.scaling_factors = dict()
×
161

162
        # Technosphere product flows for a given reference flow
163
        self.technosphere_flows = dict()
×
164
        # Life cycle inventory (biosphere flows) by reference flow
165
        self.inventory = dict()
×
166
        # Inventory (biosphere flows) for specific reference flow (e.g. 2000x15000) and impact category.
167
        self.inventories = dict()
×
168
        # Inventory multiplied by scaling (relative impact on environment) per impact category.
169
        self.characterized_inventories = dict()
×
170

171
        # Summarized contributions for EF and processes.
172
        self.elementary_flow_contributions = np.zeros(
×
173
            (
174
                len(self.func_units),
175
                len(self.methods),
176
                self.lca.biosphere_matrix.shape[0],
177
            )
178
        )
179
        self.process_contributions = np.zeros(
×
180
            (
181
                len(self.func_units),
182
                len(self.methods),
183
                self.lca.technosphere_matrix.shape[0],
184
            )
185
        )
186

187
        self.func_unit_translation_dict = {}
×
188
        for fu in self.func_units:
×
189
            key = next(iter(fu))
×
190
            amount = fu[key]
×
191
            act = bd.get_activity(key)
×
192
            self.func_unit_translation_dict[
×
193
                (
194
                    f'{act["name"]} | '
195
                    f'{act["reference product"]} | '
196
                    f'{act["location"]} | '
197
                    f'{act["database"]} | '
198
                    f"{amount}"
199
                )
200
            ] = fu
201
        self.func_key_dict = {
×
202
            m: i for i, m in enumerate(self.func_unit_translation_dict.keys())
203
        }
204
        self.func_key_list = list(self.func_unit_translation_dict.keys())
×
205

206
    def _construct_lca(self):
1✔
207
        return bc.LCA(demand=self.func_units_dict, method=self.methods[0])
×
208

209
    def _perform_calculations(self):
1✔
210
        """Isolates the code which performs calculations to allow subclasses
211
        to either alter the code or redo calculations after matrix substitution.
212
        """
213
        for row, func_unit in enumerate(self.func_units):
×
214
            # Do the LCA for the current reference flow
215
            try:
×
216
                self.lca.redo_lci(func_unit)
×
217
            except:
×
218
                # bw25 compatibility
219
                key = list(func_unit.keys())[0]
×
220
                self.lca.redo_lci({bd.get_activity(key).id: func_unit[key]})
×
221

222
            # Now update the:
223
            # - Scaling factors
224
            # - Technosphere flows
225
            # - Life cycle inventory
226
            # - Life-cycle inventory (disaggregated by contributing process)
227
            # for current reference flow
228
            self.scaling_factors.update({str(func_unit): self.lca.supply_array})
×
229
            self.technosphere_flows.update(
×
230
                {
231
                    str(func_unit): np.multiply(
232
                        self.lca.supply_array, self.lca.technosphere_matrix.diagonal()
233
                    )
234
                }
235
            )
236
            self.inventory.update(
×
237
                {str(func_unit): np.array(self.lca.inventory.sum(axis=1)).ravel()}
238
            )
239
            self.inventories.update({str(func_unit): self.lca.inventory})
×
240

241
            # Now, for each method, take the current reference flow and do inventory analysis
242
            for col, cf_matrix in enumerate(self.method_matrices):
×
243
                self.lca.characterization_matrix = cf_matrix
×
244
                self.lca.lcia_calculation()
×
245
                self.lca_scores[row, col] = self.lca.score
×
246
                self.characterized_inventories[row, col] = (
×
247
                    self.lca.characterized_inventory.copy()
248
                )
249
                self.elementary_flow_contributions[row, col] = np.array(
×
250
                    self.lca.characterized_inventory.sum(axis=1)
251
                ).ravel()
252
                self.process_contributions[row, col] = (
×
253
                    self.lca.characterized_inventory.sum(axis=0)
254
                )
255

256
    def calculate(self):
1✔
257
        self._perform_calculations()
×
258

259
    @property
1✔
260
    def func_units_dict(self) -> dict:
1✔
261
        """Return a dictionary of reference flow (key, demand)."""
262
        return {key: 1 for func_unit in self.func_units for key in func_unit}
×
263

264
    @property
1✔
265
    def all_databases(self) -> set:
1✔
266
        """Get all databases linked to the reference flows."""
267

268
        def get_dependents(dbs: set, dependents: list) -> set:
×
269
            for dep in (bd.databases[db].get("depends", []) for db in dependents):
×
270
                if not dbs.issuperset(dep):
×
271
                    dbs = get_dependents(dbs.union(dep), dep)
×
272
            return dbs
×
273

274
        dbs = set(f[0] for f in self.fu_activity_keys)
×
275
        dbs = get_dependents(dbs, list(dbs))
×
276
        # In rare cases, the default biosphere is not found as a dependency, see:
277
        # https://github.com/LCA-ActivityBrowser/activity-browser/issues/298
278
        # Always include it.
279
        dbs.add(bd.config.biosphere)
×
280
        return dbs
×
281

282
    def get_results_for_method(self, index: int = 0) -> pd.DataFrame:
1✔
283
        data = self.lca_scores[:, index]
×
284
        return pd.DataFrame(data, index=self.fu_activity_keys)
×
285

286
    @property
1✔
287
    def lca_scores_normalized(self) -> np.ndarray:
1✔
288
        """Normalize LCA scores by impact assessment method."""
289
        return self.lca_scores / self.lca_scores.max(axis=0)
×
290

291
    def get_normalized_scores_df(self) -> pd.DataFrame:
1✔
292
        """To be used for the currently inactive CorrelationPlot."""
293
        labels = [str(x + 1) for x in range(len(self.func_units))]
×
294
        return pd.DataFrame(data=self.lca_scores_normalized.T, columns=labels)
×
295

296
    def lca_scores_to_dataframe(self) -> pd.DataFrame:
1✔
297
        """Returns a dataframe of LCA scores using FU labels as index and
298
        methods as columns.
299
        """
300
        return pd.DataFrame(
×
301
            data=self.lca_scores,
302
            index=pd.Index(self.fu_activity_keys),
303
            columns=pd.Index(self.methods),
304
        )
305

306
    def get_all_metadata(self) -> None:
1✔
307
        """Populate AB_metadata with relevant database values.
308

309
        Set metadata in form of a Pandas DataFrame for biosphere and
310
        technosphere databases for tables and additional aggregation.
311
        """
312
        AB_metadata.add_metadata(self.all_databases)
×
313

314

315
class Contributions(object):
1✔
316
    """Contribution Analysis built on top of the Multi-LCA class.
317

318
    This class requires instantiated MLCA and MetaDataStore objects.
319

320
    Parameters
321
    ----------
322
    mlca : `MLCA`
323
        An instantiated MLCA object
324

325
    Attributes
326
    ----------
327
    DEFAULT_ACT_FIELDS : list
328
        Default activity/reference flow column names
329
    DEFAULT_EF_FIELDS : list
330
        Default environmental flow column names
331
    mlca: `MLCA`
332
        Linked `MLCA` instance used for contribution calculations
333
    act_fields: list
334
        technosphere-specific metadata column names
335
    ef_fields: list
336
        biosphere-specific metadata column names
337

338
    Raises
339
    ------
340
    ValueError
341
        If the given `mlca` object is not an instance of `MLCA`
342

343
    """
344

345
    ACT = "process"
1✔
346
    EF = "elementary_flow"
1✔
347
    TECH = "technosphere"
1✔
348
    BIOS = "biosphere"
1✔
349

350
    DEFAULT_ACT_FIELDS = ["reference product", "name", "location", "unit", "database"]
1✔
351
    DEFAULT_EF_FIELDS = ["name", "categories", "type", "unit", "database"]
1✔
352

353
    DEFAULT_ACT_AGGREGATES = ["none"] + DEFAULT_ACT_FIELDS
1✔
354
    DEFAULT_EF_AGGREGATES = ["none"] + DEFAULT_EF_FIELDS
1✔
355

356
    def __init__(self, mlca):
1✔
357
        if not isinstance(mlca, MLCA):
×
358
            raise ValueError("Must pass an MLCA object. Passed:", type(mlca))
×
359
        self.mlca = mlca
×
360
        # Ensure MetaDataStore is updated.
361
        self.mlca.get_all_metadata()
×
362

363
        # Set default metadata keys (those not in the dataframe will be eliminated)
364
        self.act_fields = AB_metadata.get_existing_fields(self.DEFAULT_ACT_FIELDS)
×
365
        self.ef_fields = AB_metadata.get_existing_fields(self.DEFAULT_EF_FIELDS)
×
366

367
        # Specific datastructures for retrieving relevant MLCA data
368
        # inventory: inventory, reverse index, metadata keys, metadata fields
369
        self.inventory_data = {
×
370
            "biosphere": (
371
                self.mlca.inventory,
372
                self.mlca.rev_biosphere_dict,
373
                self.mlca.fu_activity_keys,
374
                self.ef_fields,
375
            ),
376
            "technosphere": (
377
                self.mlca.technosphere_flows,
378
                self.mlca.rev_activity_dict,
379
                self.mlca.fu_activity_keys,
380
                self.act_fields,
381
            ),
382
        }
383
        # aggregation: reverse index, metadata keys, metadata fields
384
        self.aggregate_data = {
×
385
            "biosphere": (
386
                self.mlca.rev_biosphere_dict,
387
                self.mlca.lca.biosphere_dict,
388
                self.ef_fields,
389
            ),
390
            "technosphere": (
391
                self.mlca.rev_activity_dict,
392
                self.mlca.lca.activity_dict,
393
                self.act_fields,
394
            ),
395
        }
396

397
    def normalize(self, contribution_array: np.ndarray) -> np.ndarray:
1✔
398
        """Normalise the contribution array.
399

400
        Parameters
401
        ----------
402
        contribution_array : A 2-dimensional contribution array
403

404
        Returns
405
        -------
406
        2-dimensional array of same shape, with scores normalized.
407

408
        """
409
        scores = abs(contribution_array.sum(axis=1, keepdims=True))
×
410
        return contribution_array / scores
×
411

412
    def _build_dict(
1✔
413
        self,
414
        contributions: np.ndarray,
415
        FU_M_index: dict,
416
        rev_dict: dict,
417
        limit: int,
418
        limit_type: str,
419
        total_range: bool,
420
    ) -> dict:
421
        """Sort the given contribution array on method or reference flow column.
422

423
        Parameters
424
        ----------
425
        contributions: A 2-dimensional contribution array
426
        FU_M_index : Dictionary which maps the reference flows or methods to their matching columns
427
        rev_dict : 'reverse' dictionary used to map correct activity/method to its value
428
        limit : Number of top-contributing items to include
429
        limit_type : Either "number" or "percent", ContributionAnalysis.sort_array for complete explanation
430

431
        Returns
432
        -------
433
        Top-contributing flows per method or activity
434

435
        """
436
        topcontribution_dict = dict()
×
437
        for fu_or_method, col in FU_M_index.items():
×
NEW
438
            contribution_col = contributions[col, :]
×
NEW
439
            if total_range:  # total is based on the range
×
NEW
440
                total = np.abs(contribution_col).sum()
×
441
            else:  # total is based on the score
NEW
442
                total = contribution_col.sum()
×
443

UNCOV
444
            top_contribution = ca.sort_array(
×
445
                contribution_col, limit=limit, limit_type=limit_type, total=total
446
            )
447

448
            # split and calculate remaining rest sections for positive and negative part
NEW
449
            pos_rest = (
×
450
                np.sum(contribution_col[contribution_col > 0])
451
                - np.sum(top_contribution[top_contribution[:, 0] > 0][:, 0])
452
            )
NEW
453
            neg_rest = (
×
454
                    np.sum(contribution_col[contribution_col < 0])
455
                    - np.sum(top_contribution[top_contribution[:, 0] < 0][:, 0])
456
            )
457

458
            cont_per = OrderedDict()
×
459
            cont_per.update(
×
460
                {
461
                    ("Total", ""): total,
462
                    ("Rest (+)", ""): pos_rest,
463
                    ("Rest (-)", ""): neg_rest,
464
                }
465
            )
466
            for value, index in top_contribution:
×
467
                cont_per.update({rev_dict[index]: value})
×
468
            topcontribution_dict.update({fu_or_method: cont_per})
×
469
        return topcontribution_dict
×
470

471
    @staticmethod
1✔
472
    def get_labels(
1✔
473
        key_list: pd.MultiIndex,
474
        fields: Optional[list] = None,
475
        separator: str = " | ",
476
        max_length: int = False,
477
        mask: Optional[list] = None,
478
    ) -> list:
479
        """Generate labels from metadata information.
480

481
        Setting max_length will wrap the label into a multi-line string if
482
        size is larger than max_length.
483

484
        Parameters
485
        ----------
486
        key_list : An index containing 'keys' to be retrieved from the MetaDataStore
487
        fields : List of column-names to be included from the MetaDataStore
488
        separator : Specific separator to use when joining strings together
489
        max_length : Allowed character length before string is wrapped over multiple lines
490
        mask : Instead of the metadata, this list is used to check keys against.
491
            Use if data is aggregated or keys do not exist in MetaDataStore
492

493
        Returns
494
        -------
495
        Translated and/or joined (and wrapped) labels matching the keys
496

497
        """
498
        fields = (
×
499
            fields if fields else ["name", "reference product", "location", "database"]
500
        )
501
        keys = (
×
502
            k for k in key_list
503
        )  # need to do this as the keys come from a pd.Multiindex
504
        translated_keys = []
×
505
        for k in keys:
×
506
            if mask and k in mask:
×
507
                translated_keys.append(k)
×
508
            elif isinstance(k, str):
×
509
                translated_keys.append(k)
×
510
            elif k in AB_metadata.index:
×
511
                translated_keys.append(
×
512
                    separator.join(
513
                        [str(l) for l in list(AB_metadata.get_metadata(k, fields))]
514
                    )
515
                )
516
            else:
517
                translated_keys.append(separator.join([i for i in k if i != ""]))
×
518
        if max_length:
×
519
            translated_keys = [
×
520
                wrap_text(k, max_length=max_length) for k in translated_keys
521
            ]
522
        return translated_keys
×
523

524
    @classmethod
1✔
525
    def join_df_with_metadata(
1✔
526
        cls,
527
        df: pd.DataFrame,
528
        x_fields: Optional[list] = None,
529
        y_fields: Optional[list] = None,
530
        special_keys: Optional[list] = None,
531
    ) -> pd.DataFrame:
532
        """Join a dataframe that has keys on the index with metadata.
533

534
        Metadata fields are defined in x_fields.
535
        If columns are also keys (and not, e.g. method names), they can also
536
        be replaced with metadata, if y_fields are provided.
537

538
        Parameters
539
        ----------
540
        df : Simple DataFrame containing processed data
541
        x_fields : List of additional columns to add from the MetaDataStore
542
        y_fields : List of column keys for the data in the df dataframe
543
        special_keys : List of specific items to place at the top of the dataframe
544

545
        Returns
546
        -------
547
        Expanded and metadata-annotated dataframe
548

549
        """
550

551
        # replace column keys with labels
552
        df.columns = cls.get_labels(df.columns, fields=y_fields)
×
553
        # Coerce index to MultiIndex if it currently isn't
554
        if not isinstance(df.index, pd.MultiIndex):
×
555
            df.index = pd.MultiIndex.from_tuples(ids_to_keys(df.index))
×
556

557
        # get metadata for rows
558
        keys = [k for k in df.index if k in AB_metadata.index]
×
559
        metadata = AB_metadata.get_metadata(keys, x_fields)
×
560

561
        # join data with metadata
562
        joined = metadata.join(df, how="outer")
×
563

564
        if special_keys:
×
565
            # replace index keys with labels
NEW
566
            try:  # first put Total, Rest (+) and Rest (-) to the first three positions in the dataframe
×
567
                complete_index = special_keys + keys
×
568
                joined = joined.reindex(complete_index, axis="index", fill_value=0.0)
×
569
            except:
×
570
                log.error(
×
571
                    "Could not put 'Total', 'Rest (+)' and 'Rest (-)' on positions 0, 1 and 2 in the dataframe."
572
                )
573
        joined.index = cls.get_labels(joined.index, fields=x_fields)
×
574
        return joined
×
575

576
    def get_labelled_contribution_dict(
1✔
577
        self,
578
        cont_dict: dict,
579
        x_fields: list = None,
580
        y_fields: list = None,
581
        mask: list = None,
582
    ) -> pd.DataFrame:
583
        """Annotate the contribution dict with metadata.
584

585
        Parameters
586
        ----------
587
        cont_dict : Holds the contribution data connected to the functions of methods
588
        x_fields : X-axis fieldnames, these are usually the indexes/keys of specific processes
589
        y_fields : Column names specific to the cont_dict to be labelled
590
        mask : Used in case of aggregation or special cases where the usual way of using the metadata cannot be used
591

592
        Returns
593
        -------
594
        Annotated contribution dict inside a pandas dataframe
595

596
        """
597
        dfs = (
×
598
            pd.DataFrame(v.values(), index=list(v.keys()), columns=[k])
599
            for k, v in cont_dict.items()
600
        )
601
        df = pd.concat(dfs, sort=False, axis=1)
×
602
        # If the cont_dict has tuples for keys, coerce df.columns into MultiIndex
603
        if all(isinstance(k, tuple) for k in cont_dict.keys()):
×
604
            df.columns = pd.MultiIndex.from_tuples(df.columns)
×
NEW
605
        special_keys = [("Total", ""), ("Rest (+)", ""), ("Rest (-)", "")]
×
606
        # replace all 0 values with NaN and drop all rows with only NaNs
NEW
607
        df = df.replace(0, np.nan)
×
608

609
        # sort on absolute mean of a row
NEW
610
        df_bot = deepcopy(df.loc[df.index.difference(special_keys)].dropna(how="all"))
×
611

NEW
612
        func = lambda row: np.nanmean(np.abs(row))
×
NEW
613
        if len(df_bot) > 1:  # but only sort if there is something to sort
×
NEW
614
            df_bot["_sort_me_"] = (df_bot.select_dtypes(include=np.number)).apply(func, axis=1)
×
NEW
615
            df_bot.sort_values(by="_sort_me_", ascending=False, inplace=True)
×
NEW
616
            del df_bot["_sort_me_"]
×
617

NEW
618
        df = pd.concat([df.iloc[:3, :], df_bot], axis=0)
×
619

620
        if not mask:
×
621
            joined = self.join_df_with_metadata(
×
622
                df, x_fields=x_fields, y_fields=y_fields, special_keys=special_keys
623
            )
624
        else:
625
            df.columns = self.get_labels(df.columns, fields=y_fields)
×
626
            keys = [k for k in df.index if k in mask]
×
627
            combined_keys = special_keys + keys
×
628
            # Reindex the combined_keys to ensure they always exist in the dataframe,
629
            # this avoids keys with 0 values not existing due to the 'dropna' action above.
630
            df = df.reindex(combined_keys, axis="index", fill_value=0.0)
×
631
            df.index = self.get_labels(df.index, mask=mask)
×
632
            joined = df
×
633
        if joined is not None:
×
634
            return joined.reset_index(drop=False)
×
635

636
    @staticmethod
1✔
637
    def adjust_table_unit(df: pd.DataFrame, method: Optional[tuple]) -> pd.DataFrame:
1✔
638
        """Given a dataframe, adjust the unit of the table to either match the given method, or not exist."""
639
        if "unit" not in df.columns:
×
640
            return df
×
NEW
641
        keys = df.index[~df["index"].isin({"Total", "Rest (+)", "Rest (-)"})]
×
642
        unit = bd.Method(method).metadata.get("unit") if method else "unit"
×
643
        df.loc[keys, "unit"] = unit
×
644
        return df
×
645

646
    @staticmethod
1✔
647
    def _build_inventory(
1✔
648
        inventory: dict, indices: dict, columns: list, fields: list
649
    ) -> pd.DataFrame:
650
        df = pd.DataFrame(inventory)
×
651
        df.index = pd.MultiIndex.from_tuples(ids_to_keys(indices.values()))
×
652
        df.columns = Contributions.get_labels(columns, max_length=30)
×
653
        metadata = AB_metadata.get_metadata(list(ids_to_keys(indices.values())), fields)
×
654
        joined = metadata.join(df)
×
655
        joined.reset_index(inplace=True, drop=True)
×
656
        return joined
×
657

658
    def inventory_df(
1✔
659
        self, inventory_type: str, columns: set = {"name", "database", "code"}
660
    ) -> pd.DataFrame:
661
        """Return an inventory dataframe with metadata of the given type."""
662
        try:
×
663
            data = self.inventory_data[inventory_type]
×
664
            appending = columns.difference(set(data[3]))
×
665
            for clmn in appending:
×
666
                data[3].append(clmn)
×
667
        except KeyError:
×
668
            raise ValueError(
×
669
                "Type must be either 'biosphere' or 'technosphere', "
670
                "'{}' given.".format(inventory_type)
671
            )
672
        return self._build_inventory(*data)
×
673

674
    def _build_lca_scores_df(self, scores: np.ndarray) -> pd.DataFrame:
1✔
675
        df = pd.DataFrame(
×
676
            scores,
677
            index=pd.MultiIndex.from_tuples(self.mlca.fu_activity_keys),
678
            columns=self.mlca.methods,
679
        )
680
        # Add amounts column.
681
        df["amount"] = [next(iter(fu.values()), 1.0) for fu in self.mlca.func_units]
×
682
        joined = Contributions.join_df_with_metadata(
×
683
            df, x_fields=self.act_fields, y_fields=None
684
        )
685
        # Precisely order the columns that are shown in the LCA Results overview
686
        # tab: “X kg of product Y from activity Z in location L, and database D”
687
        col_order = pd.Index(
×
688
            [
689
                "amount",
690
                "unit",
691
                "reference product",
692
                "name",
693
                "location",
694
                "database",
695
            ]
696
        )
697
        methods = joined.columns.difference(col_order, sort=False)
×
698
        joined = joined.loc[:, col_order.append(methods)]
×
699
        return joined.reset_index(drop=False)
×
700

701
    def lca_scores_df(self, normalized: bool = False) -> pd.DataFrame:
1✔
702
        """Return a metadata-annotated DataFrame of the LCA scores."""
703
        scores = (
×
704
            self.mlca.lca_scores if not normalized else self.mlca.lca_scores_normalized
705
        )
706
        return self._build_lca_scores_df(scores)
×
707

708
    @staticmethod
1✔
709
    def _build_contributions(data: np.ndarray, index: int, axis: int) -> np.ndarray:
1✔
710
        return data.take(index, axis=axis)
×
711

712
    def get_contributions(
1✔
713
        self, contribution, functional_unit=None, method=None, **kwargs
714
    ) -> np.ndarray:
715
        """Return a contribution matrix given the type and fu / method."""
716
        if all([functional_unit, method]) or not any([functional_unit, method]):
×
717
            raise ValueError(
×
718
                "It must be either by reference flow or by impact category. Provided:"
719
                "\n Reference flow: {} \n Impact Category: {}".format(
720
                    functional_unit, method
721
                )
722
            )
723
        dataset = {
×
724
            "process": self.mlca.process_contributions,
725
            "elementary_flow": self.mlca.elementary_flow_contributions,
726
        }
727
        if method:
×
728
            return self._build_contributions(
×
729
                dataset[contribution], self.mlca.method_index[method], 1
730
            )
731
        elif functional_unit:
×
732
            return self._build_contributions(
×
733
                dataset[contribution], self.mlca.func_key_dict[functional_unit], 0
734
            )
735

736
    def aggregate_by_parameters(
1✔
737
        self,
738
        contributions: np.ndarray,
739
        inventory: str,
740
        parameters: Union[str, list] = None,
741
    ):
742
        """Perform aggregation of the contribution data given parameters.
743

744
        Parameters
745
        ----------
746
        contributions : 2-dimensional contribution array
747
        inventory : Either 'biosphere' or 'technosphere', used to determine which inventory to use
748
        parameters : One or more parameters by which to aggregate the given contribution array.
749

750
        Returns
751
        -------
752
        aggregated : pd.DataFrame
753
            The aggregated 2-dimensional contribution array
754
        mask_index : dict
755
            Contains all of the values of the aggregation mask, linked to their indexes
756
        mask : list or dictview or None
757
            An optional list or dictview of the mask_index values
758

759
        """
760
        rev_index, keys, fields = self.aggregate_data[inventory]
×
761
        if not parameters:
×
762
            return contributions, rev_index, None
×
763

764
        df = pd.DataFrame(contributions).T
×
765
        columns = list(range(contributions.shape[0]))
×
766
        df.index = pd.MultiIndex.from_tuples(rev_index.values())
×
767
        metadata = AB_metadata.get_metadata(list(keys), fields)
×
768

769
        joined = metadata.join(df)
×
770
        joined.reset_index(inplace=True, drop=True)
×
771
        grouped = joined.groupby(parameters)
×
772
        aggregated = grouped[columns].sum()
×
773
        mask_index = {i: m for i, m in enumerate(aggregated.index)}
×
774

775
        return aggregated.T.values, mask_index, mask_index.values()
×
776

777
    def _contribution_rows(self, contribution: str, aggregator=None):
1✔
778
        if aggregator is None:
×
779
            return self.act_fields if contribution == self.ACT else self.ef_fields
×
780
        return aggregator if isinstance(aggregator, list) else [aggregator]
×
781

782
    def _correct_method_index(self, mthd_indx: list) -> dict:
1✔
783
        """A method for amending the tuples for impact method labels so
784
        that all tuples are fully printed.
785

786
        NOTE THE AMENDED TUPLES ARE COPIED, THIS SHOULD NOT BE USED TO
787
        ASSIGN OR MODIFY THE UNDERLYING DATA STRUCTURES!
788

789
        mthd_indx: a list of tuples for the impact method names
790
        """
791
        method_tuple_length = max([len(k) for k in mthd_indx])
×
792
        conv_dict = dict()
×
793
        for v, mthd in enumerate(mthd_indx):
×
794
            if len(mthd) < method_tuple_length:
×
795
                _l = list(mthd)
×
796
                for i in range(len(mthd), method_tuple_length):
×
797
                    _l.append("")
×
798
                mthd = tuple(_l)
×
799
            conv_dict[mthd] = v
×
800
        return conv_dict
×
801

802
    def _contribution_index_cols(self, **kwargs) -> (dict, Optional[Iterable]):
1✔
803
        if kwargs.get("method") is not None:
×
804
            return self.mlca.fu_index, self.act_fields
×
805
        return self._correct_method_index(self.mlca.methods), None
×
806

807
    def top_elementary_flow_contributions(
1✔
808
        self,
809
        functional_unit: Optional[tuple] = None,
810
        method: Optional[tuple] = None,
811
        aggregator: Union[str, list, None] = None,
812
        limit: int = 5,
813
        normalize: bool = False,
814
        limit_type: str = "number",
815
        total_range: bool = True,
816
        **kwargs,
817
    ) -> pd.DataFrame:
818
        """Return top EF contributions for either functional_unit or method.
819

820
        * If functional_unit: Compare the unit against all considered impact
821
        assessment methods.
822
        * If method: Compare the method against all involved processes.
823

824
        Parameters
825
        ----------
826
        functional_unit : The reference flow to compare all considered impact categories against
827
        method : The method to compare all considered reference flows against
828
        aggregator : Used to aggregate EF contributions over certain columns
829
        limit : The number of top contributions to consider
830
        normalize : Determines whether or not to normalize the contribution values
831
        limit_type : The type of limit, either 'number' or 'percent'
832
        total_range : Whether to consider the total for contributions the range (True) or the score (False)
833

834
        Returns
835
        -------
836
        Annotated top-contribution dataframe
837

838
        """
839
        contributions = self.get_contributions(
×
840
            self.EF, functional_unit, method, **kwargs
841
        )
842

843
        x_fields = self._contribution_rows(self.EF, aggregator)
×
844
        index, y_fields = self._contribution_index_cols(
×
845
            functional_unit=functional_unit, method=method
846
        )
847
        contributions, rev_index, mask = self.aggregate_by_parameters(
×
848
            contributions, self.BIOS, aggregator
849
        )
850

851
        # Normalise if required
852
        if normalize:
×
853
            contributions = self.normalize(contributions)
×
854

855
        top_cont_dict = self._build_dict(
×
856
            contributions, index, rev_index, limit, limit_type, total_range
857
        )
858
        labelled_df = self.get_labelled_contribution_dict(
×
859
            top_cont_dict, x_fields=x_fields, y_fields=y_fields, mask=mask
860
        )
861
        self.adjust_table_unit(labelled_df, method)
×
862
        return labelled_df
×
863

864
    def top_process_contributions(
1✔
865
        self,
866
        functional_unit: Optional[tuple] = None,
867
        method: Optional[tuple] = None,
868
        aggregator: Union[str, list, None] = None,
869
        limit: int = 5,
870
        normalize: bool = False,
871
        limit_type: str = "number",
872
        total_range: bool = True,
873
        **kwargs,
874
    ) -> pd.DataFrame:
875
        """Return top process contributions for functional_unit or method.
876

877
        * If functional_unit: Compare the process against all considered impact
878
        assessment methods.
879
        * If method: Compare the method against all involved processes.
880

881
        Parameters
882
        ----------
883
        functional_unit : The reference flow to compare all considered impact categories against
884
        method : The method to compare all considered reference flows against
885
        aggregator : Used to aggregate EF contributions over certain columns
886
        limit : The number of top contributions to consider
887
        normalize : Determines whether or not to normalize the contribution values
888
        limit_type : The type of limit, either 'number' or 'percent'
889

890
        Returns
891
        -------
892
        Annotated top-contribution dataframe
893

894
        """
895
        contributions = self.get_contributions(
×
896
            self.ACT, functional_unit, method, **kwargs
897
        )
898

899
        x_fields = self._contribution_rows(self.ACT, aggregator)
×
900
        index, y_fields = self._contribution_index_cols(
×
901
            functional_unit=functional_unit, method=method
902
        )
903
        contributions, rev_index, mask = self.aggregate_by_parameters(
×
904
            contributions, self.TECH, aggregator
905
        )
906

907
        # Normalise if required
908
        if normalize:
×
909
            contributions = self.normalize(contributions)
×
910

911
        top_cont_dict = self._build_dict(
×
912
            contributions, index, rev_index, limit, limit_type, total_range
913
        )
914
        labelled_df = self.get_labelled_contribution_dict(
×
915
            top_cont_dict, x_fields=x_fields, y_fields=y_fields, mask=mask
916
        )
917
        self.adjust_table_unit(labelled_df, method)
×
918
        return labelled_df
×
919

920

921
def ids_to_keys(index_list):
1✔
922
    return [bd.get_activity(i).key if isinstance(i, int) else i for i in index_list]
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc