• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

OpenCOMPES / sed / 6851839863

13 Nov 2023 03:07PM UTC coverage: 90.391% (-0.04%) from 90.428%
6851839863

Pull #261

github

web-flow
Update linting.yml
Pull Request #261: Make linting work for tests folder

4920 of 5443 relevant lines covered (90.39%)

0.9 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.62
/sed/binning/binning.py
1
"""This module contains the binning functions of the sed.binning module
2

3
"""
4
import gc
1✔
5
from functools import reduce
1✔
6
from typing import cast
1✔
7
from typing import List
1✔
8
from typing import Sequence
1✔
9
from typing import Tuple
1✔
10
from typing import Union
1✔
11

12
import dask.dataframe
1✔
13
import numpy as np
1✔
14
import pandas as pd
1✔
15
import psutil
1✔
16
import xarray as xr
1✔
17
from threadpoolctl import threadpool_limits
1✔
18
from tqdm.auto import tqdm
1✔
19

20
from .numba_bin import numba_histogramdd
1✔
21
from .utils import _arraysum
1✔
22
from .utils import bin_centers_to_bin_edges
1✔
23
from .utils import simplify_binning_arguments
1✔
24

25
N_CPU = psutil.cpu_count()
1✔
26

27

28
def bin_partition(
1✔
29
    part: Union[dask.dataframe.DataFrame, pd.DataFrame],
30
    bins: Union[
31
        int,
32
        dict,
33
        Sequence[int],
34
        Sequence[np.ndarray],
35
        Sequence[tuple],
36
    ] = 100,
37
    axes: Sequence[str] = None,
38
    ranges: Sequence[Tuple[float, float]] = None,
39
    hist_mode: str = "numba",
40
    jitter: Union[list, dict] = None,
41
    return_edges: bool = False,
42
    skip_test: bool = False,
43
) -> Union[np.ndarray, Tuple[np.ndarray, list]]:
44
    """Compute the n-dimensional histogram of a single dataframe partition.
45

46
    Args:
47
        part (Union[dask.dataframe.DataFrame, pd.DataFrame]): dataframe on which
48
            to perform the histogram. Usually a partition of a dask DataFrame.
49
        bins (int, dict, Sequence[int], Sequence[np.ndarray], Sequence[tuple], optional):
50
            Definition of the bins. Can  be any of the following cases:
51

52
                - an integer describing the number of bins for all dimensions. This
53
                  requires "ranges" to be defined as well.
54
                - A sequence containing one entry of the following types for each
55
                  dimenstion:
56

57
                    - an integer describing the number of bins. This requires "ranges"
58
                      to be defined as well.
59
                    - a np.arrays defining the bin centers
60
                    - a tuple of 3 numbers describing start, end and step of the binning
61
                      range.
62

63
                - a dictionary made of the axes as keys and any of the above as
64
                  values.
65

66
            The last option takes priority over the axes and range arguments.
67
            Defaults to 100.
68
        axes (Sequence[str], optional): Sequence containing the names of
69
            the axes (columns) on which to calculate the histogram. The order will be
70
            the order of the dimensions in the resulting array. Only not required if
71
            bins are provided as dictionary containing the axis names.
72
            Defaults to None.
73
        ranges (Sequence[Tuple[float, float]], optional): Sequence of tuples containing
74
            the start and end point of the binning range. Required if bins given as
75
            int or Sequence[int]. Defaults to None.
76
        hist_mode (str, optional): Histogram calculation method.
77

78
                - "numpy": use ``numpy.histogramdd``,
79
                - "numba" use a numba powered similar method.
80

81
            Defaults to "numba".
82
        jitter (Union[list, dict], optional): a list of the axes on which to apply
83
            jittering. To specify the jitter amplitude or method (normal or uniform
84
            noise) a dictionary can be passed. This should look like
85
            jitter={'axis':{'amplitude':0.5,'mode':'uniform'}}.
86
            This example also shows the default behaviour, in case None is
87
            passed in the dictionary, or jitter is a list of strings.
88
            Warning: this is not the most performing approach. Applying jitter
89
            on the dataframe before calling the binning is much faster.
90
            Defaults to None.
91
        return_edges (bool, optional): If True, returns a list of D arrays
92
            describing the bin edges for each dimension, similar to the
93
            behaviour of ``np.histogramdd``. Defaults to False.
94
        skip_test (bool, optional): Turns off input check and data transformation.
95
            Defaults to False as it is intended for internal use only.
96
            Warning: setting this True might make error tracking difficult.
97

98
    Raises:
99
        ValueError: When the method requested is not available.
100
        AttributeError: if bins axes and range are not congruent in dimensionality.
101
        KeyError: when the columns along which to compute the histogram are not
102
            present in the dataframe
103

104
    Returns:
105
        Union[np.ndarray, Tuple[np.ndarray, list]]: 2-element tuple returned only when
106
        returnEdges is True. Otherwise only hist is returned.
107

108
        - **hist**: The result of the n-dimensional binning
109
        - **edges**: A list of D arrays describing the bin edges for each dimension.
110
    """
111
    if not skip_test:
1✔
112
        bins, axes, ranges = simplify_binning_arguments(bins, axes, ranges)
1✔
113
    else:
114
        if not isinstance(bins, list) or not (
1✔
115
            all(isinstance(x, (int, np.int64)) for x in bins)
116
            or all(isinstance(x, np.ndarray) for x in bins)
117
        ):
118
            raise TypeError(
1✔
119
                "bins needs to be of type 'List[int] or List[np.ndarray]' if tests are skipped!",
120
            )
121
        if not (isinstance(axes, list)) or not all(isinstance(axis, str) for axis in axes):
1✔
122
            raise TypeError(
×
123
                "axes needs to be of type 'List[str]' if tests are skipped!",
124
            )
125
        bins = cast(Union[List[int], List[np.ndarray]], bins)
1✔
126
        axes = cast(List[str], axes)
1✔
127
        ranges = cast(List[Tuple[float, float]], ranges)
1✔
128

129
    # convert bin centers to bin edges:
130
    if all(isinstance(x, np.ndarray) for x in bins):
1✔
131
        bins = cast(List[np.ndarray], bins)
1✔
132
        for i, bin_centers in enumerate(bins):
1✔
133
            bins[i] = bin_centers_to_bin_edges(bin_centers)
1✔
134
    else:
135
        bins = cast(List[int], bins)
1✔
136
        # shift ranges by half a bin size to align the bin centers to the given ranges,
137
        # as the histogram functions interprete the ranges as limits for the edges.
138
        for i, nbins in enumerate(bins):
1✔
139
            halfbinsize = (ranges[i][1] - ranges[i][0]) / (nbins) / 2
1✔
140
            ranges[i] = (
1✔
141
                ranges[i][0] - halfbinsize,
142
                ranges[i][1] - halfbinsize,
143
            )
144

145
    # Locate columns for binning operation
146
    col_id = [part.columns.get_loc(axis) for axis in axes]
1✔
147

148
    if jitter is not None:
1✔
149
        sel_part = part[axes].copy()
1✔
150

151
        if isinstance(jitter, Sequence):
1✔
152
            jitter = {k: None for k in jitter}
1✔
153
        for col, jpars in jitter.items():
1✔
154
            if col in axes:
1✔
155
                if jpars is None:
1✔
156
                    jpars = {}
1✔
157
                amp = jpars.get("amplitude", 0.5)
1✔
158
                mode = jpars.get("mode", "uniform")
1✔
159
                ax_index = axes.index(col)
1✔
160
                _bin = bins[ax_index]
1✔
161
                if isinstance(_bin, (int, np.int64)):
1✔
162
                    rng = ranges[ax_index]
1✔
163
                    binsize = abs(rng[1] - rng[0]) / _bin
1✔
164
                else:
165
                    binsize = abs(_bin[0] - _bin[1])
×
166
                    assert np.allclose(
×
167
                        binsize,
168
                        abs(_bin[-3] - _bin[-2]),
169
                    ), f"bins along {col} are not uniform. Cannot apply jitter."
170
                apply_jitter_on_column(sel_part, amp * binsize, col, mode)
1✔
171
        vals = sel_part.values
1✔
172
    else:
173
        vals = part.values[:, col_id]
1✔
174
    if hist_mode == "numba":
1✔
175
        hist_partition, edges = numba_histogramdd(
1✔
176
            vals,
177
            bins=bins,
178
            ranges=ranges,
179
        )
180
    elif hist_mode == "numpy":
1✔
181
        hist_partition, edges = np.histogramdd(
1✔
182
            vals,
183
            bins=bins,
184
            range=ranges,
185
        )
186
    else:
187
        raise ValueError(
1✔
188
            f"No binning method {hist_mode} available. Please choose between " f"numba and numpy.",
189
        )
190

191
    if return_edges:
1✔
192
        return hist_partition, edges
1✔
193

194
    return hist_partition
1✔
195

196

197
def bin_dataframe(
1✔
198
    df: dask.dataframe.DataFrame,
199
    bins: Union[
200
        int,
201
        dict,
202
        Sequence[int],
203
        Sequence[np.ndarray],
204
        Sequence[tuple],
205
    ] = 100,
206
    axes: Sequence[str] = None,
207
    ranges: Sequence[Tuple[float, float]] = None,
208
    hist_mode: str = "numba",
209
    mode: str = "fast",
210
    jitter: Union[list, dict] = None,
211
    pbar: bool = True,
212
    n_cores: int = N_CPU - 1,
213
    threads_per_worker: int = 4,
214
    threadpool_api: str = "blas",
215
    return_partitions: bool = False,
216
    **kwds,
217
) -> xr.DataArray:
218
    """Computes the n-dimensional histogram on columns of a dataframe,
219
    parallelized.
220

221
    Args:
222
        df (dask.dataframe.DataFrame): a dask.DataFrame on which to perform the
223
            histogram.
224
            bins (int, dict, Sequence[int], Sequence[np.ndarray], Sequence[tuple], optional):
225
            Definition of the bins. Can be any of the following cases:
226

227
                - an integer describing the number of bins for all dimensions. This
228
                  requires "ranges" to be defined as well.
229
                - A sequence containing one entry of the following types for each
230
                  dimenstion:
231

232
                    - an integer describing the number of bins. This requires "ranges"
233
                      to be defined as well.
234
                    - a np.arrays defining the bin centers
235
                    - a tuple of 3 numbers describing start, end and step of the binning
236
                      range.
237

238
                - a dictionary made of the axes as keys and any of the above as
239
                  values.
240

241
            The last option takes priority over the axes and range arguments.
242
            Defaults to 100.
243
        axes (Sequence[str], optional): Sequence containing the names of
244
            the axes (columns) on which to calculate the histogram. The order will be
245
            the order of the dimensions in the resulting array. Only not required if
246
            bins are provided as dictionary containing the axis names.
247
            Defaults to None.
248
        ranges (Sequence[Tuple[float, float]], optional): Sequence of tuples containing
249
            the start and end point of the binning range. Required if bins given as
250
            int or Sequence[int]. Defaults to None.
251
        hist_mode (str, optional): Histogram calculation method.
252

253
                - "numpy": use ``numpy.histogramdd``,
254
                - "numba" use a numba powered similar method.
255

256
            Defaults to "numba".
257
        mode (str, optional): Defines how the results from each partition are combined.
258

259
                - 'fast': Uses parallelized recombination of results.
260
                - 'lean': Store all partition results in a list, and recombine at the
261
                  end.
262
                - 'legacy': Single-core recombination of partition results.
263

264
            Defaults to "fast".
265
        jitter (Union[list, dict], optional): a list of the axes on which to apply
266
            jittering. To specify the jitter amplitude or method (normal or uniform
267
            noise) a dictionary can be passed. This should look like
268
            jitter={'axis':{'amplitude':0.5,'mode':'uniform'}}.
269
            This example also shows the default behaviour, in case None is
270
            passed in the dictionary, or jitter is a list of strings.
271
            Warning: this is not the most performing approach. applying jitter
272
            on the dataframe before calling the binning is much faster.
273
            Defaults to None.
274
        pbar (bool, optional): Option to show the tqdm progress bar. Defaults to True.
275
        n_cores (int, optional): Number of CPU cores to use for parallelization.
276
            Defaults to all but one of the available cores. Defaults to N_CPU-1.
277
        threads_per_worker (int, optional): Limit the number of threads that
278
            multiprocessing can spawn. Defaults to 4.
279
        threadpool_api (str, optional): The API to use for multiprocessing.
280
            Defaults to "blas".
281
        return_partitions (bool, optional): Option to return a hypercube of dimension
282
            n+1, where the last dimension corresponds to the dataframe partitions.
283
            Defaults to False.
284
        **kwds: Keyword arguments passed to ``dask.compute()``
285

286
    Raises:
287
        Warning: Warns if there are unimplemented features the user is trying to use.
288
        ValueError: Raised when there is a mismatch in dimensions between the
289
            binning parameters.
290

291
    Returns:
292
        xr.DataArray: The result of the n-dimensional binning represented in an
293
        xarray object, combining the data with the axes (bin centers).
294
    """
295
    bins, axes, ranges = simplify_binning_arguments(bins, axes, ranges)
1✔
296

297
    # create the coordinate axes for the xarray output
298
    # if provided as array, they are interpreted as bin centers
299
    if isinstance(bins[0], np.ndarray):
1✔
300
        bins = cast(List[np.ndarray], bins)
1✔
301
        coords = dict(zip(axes, bins))
1✔
302
    elif ranges is None:
1✔
303
        raise ValueError(
×
304
            "bins is not an array and range is none. this shouldn't happen.",
305
        )
306
    else:
307
        bins = cast(List[int], bins)
1✔
308
        coords = {
1✔
309
            ax: np.linspace(r[0], r[1], n, endpoint=False) for ax, r, n in zip(axes, ranges, bins)
310
        }
311

312
    full_shape = tuple(axis.size for axis in coords.values())
1✔
313

314
    full_result = np.zeros(full_shape)
1✔
315
    partition_results = []  # Partition-level results
1✔
316

317
    # limit multithreading in worker threads
318
    with threadpool_limits(limits=threads_per_worker, user_api=threadpool_api):
1✔
319

320
        # Main loop for binning
321
        for i in tqdm(range(0, df.npartitions, n_cores), disable=not pbar):
1✔
322

323
            core_tasks = []  # Core-level jobs
1✔
324
            for j in range(0, n_cores):
1✔
325

326
                partition_index = i + j
1✔
327
                if partition_index >= df.npartitions:
1✔
328
                    break
×
329

330
                df_partition = df.get_partition(
1✔
331
                    partition_index,
332
                )  # Obtain dataframe partition
333
                core_tasks.append(
1✔
334
                    dask.delayed(bin_partition)(
335
                        df_partition,
336
                        bins=bins,
337
                        axes=axes,
338
                        ranges=ranges,
339
                        hist_mode=hist_mode,
340
                        jitter=jitter,
341
                        skip_test=True,
342
                        return_edges=False,
343
                    ),
344
                )
345

346
            if len(core_tasks) > 0:
1✔
347
                core_results = dask.compute(*core_tasks, **kwds)
1✔
348

349
                if return_partitions:
1✔
350
                    for core_result in core_results:
1✔
351
                        partition_results.append(core_result)
1✔
352
                    del core_results
1✔
353

354
                elif mode == "legacy":
1✔
355
                    # Combine all core results for a dataframe partition
356
                    partition_result = np.zeros_like(core_results[0])
1✔
357
                    for core_result in core_results:
1✔
358
                        partition_result += core_result
1✔
359

360
                    partition_results.append(partition_result)
1✔
361
                    # del partitionResult
362

363
                elif mode == "lean":
1✔
364
                    # Combine all core results for a dataframe partition
365
                    partition_result = reduce(_arraysum, core_results)
1✔
366
                    full_result += partition_result
1✔
367
                    del partition_result
1✔
368
                    del core_results
1✔
369

370
                elif mode == "fast":
1✔
371
                    combine_tasks = []
1✔
372
                    for j in range(0, n_cores):
1✔
373
                        combine_parts = []
1✔
374
                        # split results along the first dimension among worker
375
                        # threads
376
                        for core_result in core_results:
1✔
377
                            combine_parts.append(
1✔
378
                                core_result[
379
                                    int(j * full_shape[0] / n_cores) : int(
380
                                        (j + 1) * full_shape[0] / n_cores,
381
                                    ),
382
                                    ...,
383
                                ],
384
                            )
385
                        combine_tasks.append(
1✔
386
                            dask.delayed(reduce)(_arraysum, combine_parts),
387
                        )
388
                    combine_results = dask.compute(*combine_tasks, **kwds)
1✔
389
                    # Directly fill into target array. This is much faster than
390
                    # the (not so parallel) reduce/concatenation used before,
391
                    # and uses less memory.
392

393
                    for j in range(0, n_cores):
1✔
394
                        full_result[
1✔
395
                            int(j * full_shape[0] / n_cores) : int(
396
                                (j + 1) * full_shape[0] / n_cores,
397
                            ),
398
                            ...,
399
                        ] += combine_results[j]
400
                    del combine_parts
1✔
401
                    del combine_tasks
1✔
402
                    del combine_results
1✔
403
                    del core_results
1✔
404
                else:
405
                    raise ValueError(f"Could not interpret mode {mode}")
1✔
406

407
            del core_tasks
1✔
408

409
    if return_partitions:
1✔
410
        coords = {**coords, **{"df_part": np.arange(df.npartitions)}}
1✔
411
        dims = list(axes)
1✔
412
        dims.append("df_part")
1✔
413
        data_array = xr.DataArray(
1✔
414
            data=np.stack(partition_results, axis=-1).astype("float32"),
415
            coords=coords,
416
            dims=dims,
417
        )
418

419
    else:
420
        if mode == "legacy":
1✔
421
            # still need to combine all partition results
422
            full_result = np.zeros_like(partition_results[0])
1✔
423
            for partition_result in partition_results:
1✔
424
                full_result += np.nan_to_num(partition_result)
1✔
425

426
        data_array = xr.DataArray(
1✔
427
            data=full_result.astype("float32"),
428
            coords=coords,
429
            dims=list(axes),
430
        )
431

432
    gc.collect()
1✔
433
    return data_array
1✔
434

435

436
def normalization_histogram_from_timestamps(
1✔
437
    df: dask.dataframe.DataFrame,
438
    axis: str,
439
    bin_centers: np.ndarray,
440
    time_stamp_column: str,
441
) -> xr.DataArray:
442
    """Get a normalization histogram from the time stamps column in the dataframe.
443

444
    Args:
445
        df (dask.dataframe.DataFrame): a dask.DataFrame on which to perform the
446
            histogram.
447
        axis (str): The axis (dataframe column) on which to calculate the normalization
448
            histogram.
449
        bin_centers (np.ndarray): Bin centers used for binning of the axis.
450
        time_stamp_column (str): Dataframe column containing the time stamps.
451

452
    Returns:
453
        xr.DataArray: Calculated normalization histogram.
454
    """
455
    time_per_electron = df[time_stamp_column].diff()
1✔
456

457
    bins = df[axis].map_partitions(
1✔
458
        pd.cut,
459
        bins=bin_centers_to_bin_edges(bin_centers),
460
    )
461

462
    histogram = time_per_electron.groupby([bins]).sum().compute().values
1✔
463

464
    data_array = xr.DataArray(
1✔
465
        data=histogram,
466
        coords={axis: bin_centers},
467
    )
468

469
    return data_array
1✔
470

471

472
def normalization_histogram_from_timed_dataframe(
1✔
473
    df: dask.dataframe.DataFrame,
474
    axis: str,
475
    bin_centers: np.ndarray,
476
    time_unit: float,
477
) -> xr.DataArray:
478
    """Get a normalization histogram from a timed datafram.
479

480
    Args:
481
        df (dask.dataframe.DataFrame): a dask.DataFrame on which to perform the
482
            histogram. Entries should be based on an equal time unit.
483
        axis (str): The axis (dataframe column) on which to calculate the normalization
484
            histogram.
485
        bin_centers (np.ndarray): Bin centers used for binning of the axis.
486
        time_unit (float): Time unit the data frame entries are based on.
487

488
    Returns:
489
        xr.DataArray: Calculated normalization histogram.
490
    """
491
    bins = df[axis].map_partitions(
1✔
492
        pd.cut,
493
        bins=bin_centers_to_bin_edges(bin_centers),
494
    )
495

496
    histogram = df[axis].groupby([bins]).count().compute().values * time_unit
1✔
497
    # histogram = bin_dataframe(df, axes=[axis], bins=[bin_centers]) * time_unit
498

499
    data_array = xr.DataArray(
1✔
500
        data=histogram,
501
        coords={axis: bin_centers},
502
    )
503

504
    return data_array
1✔
505

506

507
def apply_jitter_on_column(
1✔
508
    df: Union[dask.dataframe.core.DataFrame, pd.DataFrame],
509
    amp: float,
510
    col: str,
511
    mode: str = "uniform",
512
):
513
    """Add jittering to the column of a dataframe.
514

515
    Args:
516
        df (Union[dask.dataframe.core.DataFrame, pd.DataFrame]): Dataframe to add
517
            noise/jittering to.
518
        amp (float): Amplitude scaling for the jittering noise.
519
        col (str): Name of the column to add jittering to.
520
        mode (str, optional): Choose between 'uniform' for uniformly
521
            distributed noise, or 'normal' for noise with normal distribution.
522
            For columns with digital values, one should choose 'uniform' as
523
            well as amplitude (amp) equal to the step size. Defaults to "uniform".
524
    """
525
    colsize = df[col].size
1✔
526
    if mode == "uniform":
1✔
527
        # Uniform Jitter distribution
528
        df[col] += amp * np.random.uniform(low=-1, high=1, size=colsize)
1✔
529
    elif mode == "normal":
1✔
530
        # Normal Jitter distribution works better for non-linear
531
        # transformations and jitter sizes that don't match the original bin
532
        # sizes
533
        df[col] += amp * np.random.standard_normal(size=colsize)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc