• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

OpenCOMPES / sed / 6916933446

18 Nov 2023 11:32PM UTC coverage: 90.564%. First build
6916933446

Pull #264

github

rettigl
implement changes from PR #239: buffer file consistency check and tests into SXP loader
Pull Request #264: SXP loader

351 of 398 new or added lines in 5 files covered. (88.19%)

5471 of 6041 relevant lines covered (90.56%)

0.91 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.1
/sed/loader/utils.py
1
"""Utilities for loaders
2
"""
3
from glob import glob
1✔
4
from typing import cast
1✔
5
from typing import List
1✔
6
from typing import Sequence
1✔
7
from typing import Union
1✔
8

9
import dask.dataframe
1✔
10
import numpy as np
1✔
11
import pandas as pd
1✔
12
from h5py import File
1✔
13
from h5py import Group
1✔
14
from natsort import natsorted
1✔
15

16

17
def gather_files(
1✔
18
    folder: str,
19
    extension: str,
20
    f_start: int = None,
21
    f_end: int = None,
22
    f_step: int = 1,
23
    file_sorting: bool = True,
24
) -> List[str]:
25
    """Collects and sorts files with specified extension from a given folder.
26

27
    Args:
28
        folder (str): The folder to search
29
        extension (str):  File extension used for glob.glob().
30
        f_start (int, optional): Start file id used to construct a file selector.
31
            Defaults to None.
32
        f_end (int, optional): End file id used to construct a file selector.
33
            Defaults to None.
34
        f_step (int, optional): Step of file id incrementation, used to construct
35
            a file selector. Defaults to 1.
36
        file_sorting (bool, optional): Option to sort the files by their names.
37
            Defaults to True.
38

39
    Returns:
40
        List[str]: List of collected file names.
41
    """
42
    try:
1✔
43
        files = glob(folder + "/*." + extension)
1✔
44

45
        if file_sorting:
1✔
46
            files = cast(List[str], natsorted(files))
1✔
47

48
        if f_start is not None and f_end is not None:
1✔
49
            files = files[slice(f_start, f_end, f_step)]
×
50

51
    except FileNotFoundError:
×
52
        print("No legitimate folder address is specified for file retrieval!")
×
53
        raise
×
54

55
    return files
1✔
56

57

58
def parse_h5_keys(h5_file: File, prefix: str = "") -> List[str]:
1✔
59
    """Helper method which parses the channels present in the h5 file
60
    Args:
61
        h5_file (h5py.File): The H5 file object.
62
        prefix (str, optional): The prefix for the channel names.
63
        Defaults to an empty string.
64

65
    Returns:
66
        List[str]: A list of channel names in the H5 file.
67

68
    Raises:
69
        Exception: If an error occurs while parsing the keys.
70
    """
71

72
    # Initialize an empty list to store the channels
73
    file_channel_list = []
1✔
74

75
    # Iterate over the keys in the H5 file
76
    for key in h5_file.keys():
1✔
77
        try:
1✔
78
            # Check if the object corresponding to the key is a group
79
            if isinstance(h5_file[key], Group):
1✔
80
                # If it's a group, recursively call the function on the group object
81
                # and append the returned channels to the file_channel_list
82
                file_channel_list.extend(
1✔
83
                    parse_h5_keys(h5_file[key], prefix=prefix + "/" + key),
84
                )
85
            else:
86
                # If it's not a group (i.e., it's a dataset), append the key
87
                # to the file_channel_list
88
                file_channel_list.append(prefix + "/" + key)
1✔
89
        except KeyError as exception:
×
90
            # If an exception occurs, raise a new exception with an error message
91
            raise KeyError(
×
92
                f"Error parsing key: {prefix}/{key}",
93
            ) from exception
94

95
    # Return the list of channels
96
    return file_channel_list
1✔
97

98

99
def split_channel_bitwise(
1✔
100
    df: dask.dataframe.DataFrame,
101
    input_column: str,
102
    output_columns: Sequence[str],
103
    bit_mask: int,
104
    overwrite: bool = False,
105
    types: Sequence[type] = None,
106
) -> dask.dataframe.DataFrame:
107
    """Splits a channel into two channels bitwise.
108

109
    This function splits a channel into two channels by separating the first n bits from
110
    the remaining bits. The first n bits are stored in the first output column, the
111
    remaining bits are stored in the second output column.
112

113
    Args:
114
        df (dask.dataframe.DataFrame): Dataframe to use.
115
        input_column (str): Name of the column to split.
116
        output_columns (Sequence[str]): Names of the columns to create.
117
        bit_mask (int): Bit mask to use for splitting.
118
        overwrite (bool, optional): Whether to overwrite existing columns.
119
            Defaults to False.
120
        types (Sequence[type], optional): Types of the new columns.
121

122
    Returns:
123
        dask.dataframe.DataFrame: Dataframe with the new columns.
124
    """
125
    if len(output_columns) != 2:
1✔
126
        raise ValueError("Exactly two output columns must be given.")
1✔
127
    if input_column not in df.columns:
1✔
128
        raise KeyError(f"Column {input_column} not in dataframe.")
1✔
129
    if output_columns[0] in df.columns and not overwrite:
1✔
130
        raise KeyError(f"Column {output_columns[0]} already in dataframe.")
1✔
131
    if output_columns[1] in df.columns and not overwrite:
1✔
132
        raise KeyError(f"Column {output_columns[1]} already in dataframe.")
×
133
    if bit_mask < 0 or not isinstance(bit_mask, int):
1✔
134
        raise ValueError("bit_mask must be a positive. integer")
1✔
135
    if types is None:
1✔
136
        types = [np.int8 if bit_mask < 8 else np.int16, np.int32]
1✔
137
    elif len(types) != 2:
1✔
138
        raise ValueError("Exactly two types must be given.")
1✔
139
    elif not all(isinstance(t, type) for t in types):
1✔
140
        raise ValueError("types must be a sequence of types.")
1✔
141
    df[output_columns[0]] = (df[input_column] % 2**bit_mask).astype(types[0])
1✔
142
    df[output_columns[1]] = (df[input_column] // 2**bit_mask).astype(types[1])
1✔
143
    return df
1✔
144

145

146
def split_dld_time_from_sector_id(
1✔
147
    df: Union[pd.DataFrame, dask.dataframe.DataFrame],
148
    tof_column: str = None,
149
    sector_id_column: str = None,
150
    sector_id_reserved_bits: int = None,
151
    config: dict = None,
152
) -> Union[pd.DataFrame, dask.dataframe.DataFrame]:
153
    """Converts the 8s time in steps to time in steps and sectorID.
154

155
    The 8s detector encodes the dldSectorID in the 3 least significant bits of the
156
    dldTimeSteps channel.
157

158
    Args:
159
        df (Union[pd.DataFrame, dask.dataframe.DataFrame]): Dataframe to use.
160
        tof_column (str, optional): Name of the column containing the
161
            time-of-flight steps. Defaults to config["dataframe"]["tof_column"].
162
        sector_id_column (str, optional): Name of the column containing the
163
            sectorID. Defaults to config["dataframe"]["sector_id_column"].
164
        sector_id_reserved_bits (int, optional): Number of bits reserved for the
165
        config (dict, optional): Configuration dictionary. Defaults to None.
166

167
    Returns:
168
        Union[pd.DataFrame, dask.dataframe.DataFrame]: Dataframe with the new columns.
169
    """
170
    if tof_column is None:
1✔
171
        if config is None:
1✔
NEW
172
            raise ValueError("Either tof_column or config must be given.")
×
173
        tof_column = config["dataframe"]["tof_column"]
1✔
174
    if sector_id_column is None:
1✔
175
        if config is None:
1✔
NEW
176
            raise ValueError("Either sector_id_column or config must be given.")
×
177
        sector_id_column = config["dataframe"]["sector_id_column"]
1✔
178
    if sector_id_reserved_bits is None:
1✔
179
        if config is None:
1✔
NEW
180
            raise ValueError("Either sector_id_reserved_bits or config must be given.")
×
181
        sector_id_reserved_bits = config["dataframe"].get("sector_id_reserved_bits", None)
1✔
182
        if sector_id_reserved_bits is None:
1✔
NEW
183
            raise ValueError('No value for "sector_id_reserved_bits" found in config.')
×
184

185
    if sector_id_column in df.columns:
1✔
NEW
186
        raise ValueError(
×
187
            f"Column {sector_id_column} already in dataframe. This function is not idempotent.",
188
        )
189
    df = split_channel_bitwise(
1✔
190
        df=df,
191
        input_column=tof_column,
192
        output_columns=[sector_id_column, tof_column],
193
        bit_mask=sector_id_reserved_bits,
194
        overwrite=True,
195
        types=[np.int8, np.int32],
196
    )
197
    return df
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc