• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

iprafols / stacking / 9285461052

29 May 2024 11:43AM UTC coverage: 99.669% (-0.3%) from 100.0%
9285461052

push

github

iprafols
fixed test suite

487 of 489 branches covered (99.59%)

Branch coverage included in aggregate %.

2 of 2 new or added lines in 1 file covered. (100.0%)

4 existing lines in 2 files now uncovered.

1318 of 1322 relevant lines covered (99.7%)

2.99 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.92
/stacking/normalizers/multiple_regions_normalization_utils.py
1
""" This module define utility functions for class MultipleRegionsNornalization"""
2
from datetime import datetime
3✔
3

4
from astropy.io import fits
3✔
5
from numba import njit, prange
3✔
6
import numpy as np
3✔
7

8
from stacking._version import __version__
3✔
9

10

11
@njit
3✔
12
def compute_norm_factors(flux,
3✔
13
                         ivar,
14
                         wavelength,
15
                         num_intervals,
16
                         intervals,
17
                         sigma_i2=0.0025):
18
    """ Compute the normalisation factors for the specified intervals.
19

20
    The normalisation factor, n, is computed doing the average of the fluxes, f_i,
21
    inside the normalisation interval:
22
        n = sum_j f_j w_j / sum_j w_j
23
    where w_j is the weight of pixel j computed as  w = 1 / (sigma_j^2 + sigma_I^2),
24
    sigma_j is the variance of pixel j, and sigma_I is an added variance to prevent
25
    the highest signal-to-noise pixels to dominate the sum.
26

27
    The normalisation signal-to-noise, s, is computed dividing the normalisation
28
    factor, n, by sum in quadrature of the errors, e_i:
29
        s = n / sqrt(sum_j e_j^2 w_j/ sum_j w_j)
30

31
    Pixels where ivar is set to 0 are ignored in these calculations.
32

33
    Arguments
34
    ---------
35
    flux: array of float
36
    The flux array
37

38
    ivar: array of float
39
    The inverse variance associated with the flux
40

41
    wavelength: array of float
42
    The wavelength array
43

44
    num_intervals: int
45
    The number of intervals
46

47
    intervals: array of (float, float)
48
    Array containing the selected intervals. Each item must contain
49
    two floats signaling the starting and ending wavelength of the interval.
50
    Naturally, the starting wavelength must be smaller than the ending wavelength.
51

52
    sigma_i2: float - Default: 0.0025
53
    A correction to the weights so that pixels with very small variance do not
54
    dominate. Weights are computed as w = 1 / (sigma^2 + sigma_i^2)
55

56
    Return
57
    ------
58
    results: array of float
59
    Array of size `4*num_intervals`. Indexs 3X contain the normalization
60
    factor of the Xth region, indexs 3X + 1 contain the normalization
61
    signal to noise of the Xth region, indexs 3X + 2 contain the number of
62
    pixels used to compute the normalization of the Xth region, and indexs 3X + 3
63
    contain the total weight used in the computation of the normalization factor
64
    """
65
    # normalization factors occupy the indexs 3X
66
    # normalization signal-to-noise occupy indexs 3X + 1
67
    # number of pixels occupy indexs 3X + 2
68
    results = np.zeros(4 * num_intervals, dtype=np.float64)
3✔
69

70
    # Disabling pylint warning, see https://github.com/PyCQA/pylint/issues/2910
71
    for index in prange(num_intervals):  # pylint: disable=not-an-iterable
3✔
72
        pos = np.where((wavelength >= intervals[index][0]) &
3✔
73
                       (wavelength <= intervals[index][1]) & (ivar != 0.0))
74
        # number of pixels
75
        num_pixels = float(pos[0].size)
3✔
76
        results[4 * index + 2] = num_pixels
3✔
77

78
        if num_pixels == 0:
3✔
79
            # norm factor
80
            results[4 * index] = np.nan
3✔
81
            # norm sn
82
            results[4 * index + 1] = np.nan
3✔
83
            # weight
84
            results[4 * index + 3] = np.nan
3✔
85
        else:
86
            # weight
87
            weight = ivar[pos]
3✔
88
            if not np.isclose(sigma_i2, 0.0):
3✔
89
                weight /= (1 + sigma_i2 * ivar[pos])
3✔
90
            total_weight = np.sum(weight)
3✔
91

92
            norm_factor = np.sum(flux[pos] * weight) / total_weight
3✔
93

94
            mean_noise = np.sqrt(np.sum(weight / ivar[pos]) / total_weight)
3✔
95

96
            # keep the results
97
            if norm_factor > 0:
3✔
98
                # norm factor
99
                results[4 * index] = norm_factor
3✔
100
                # norm sn
101
                results[4 * index + 1] = norm_factor / mean_noise
3✔
102
                # weight
103
                results[4 * index + 3] = total_weight
3✔
104
            else:
105
                # norm factor
106
                results[4 * index] = np.nan
3✔
107
                # norm sn
108
                results[4 * index + 1] = np.nan
3✔
109
                # weight
110
                results[4 * index + 3] = np.nan
3✔
111

112
    return results
3✔
113

114

115
def save_correction_factors_ascii(filename, correction_factors):
3✔
116
    """ Save the correction factors in an ascii file
117

118
    Arguments
119
    ---------
120
    filename: str
121
    Name of the file
122

123
    correction_factors: array of float
124
    Correction factors that relate the different intervals
125
    """
126
    with open(filename, "w", encoding="utf-8") as results:
3✔
127
        results.write("# interval correction_factor\n")
3✔
128
        for index, correction_factor in enumerate(correction_factors):
3✔
129
            results.write(f"{index} {correction_factor}\n")
3✔
130

131

132
def save_norm_factors_ascii(filename, norm_factors):
3✔
133
    """ Save the normalisation factors in an ascii file
134

135
    Arguments
136
    ---------
137
    filename: str
138
    Name of the file
139

140
    norm_factors: pd.DataFrame
141
    Pandas DataFrame containing the normalization factors
142
    """
143
    norm_factors.to_csv(
3✔
144
        filename,
145
        index=False,
146
        float_format='%.6f',
147
        encoding="utf-8",
148
        sep=" ",
149
        na_rep="nan",
150
    )
151

152

153
def save_norm_factors_fits(filename, norm_factors, intervals,
3✔
154
                           correction_factors):
155
    """ Save the normalisation factors, the normalization intervals and the
156
    correction factors in a fits file
157

158
    Arguments
159
    ---------
160
    filename: str
161
    Name of the file
162

163
    norm_factors: pd.DataFrame
164
    Pandas DataFrame containing the normalization factors
165

166
    intervals:  array of (float, float)
167
    Array containing the selected intervals. Each item must contain
168
    two floats signaling the starting and ending wavelength of the interval.
169
    Naturally, the starting wavelength must be smaller than the ending wavelength.
170

171
    correction_factors: array of float
172
    Correction factors that relate the different intervals
173
    """
174
    # primary HDU
175
    primary_hdu = fits.PrimaryHDU()
3✔
176
    now = datetime.now()
3✔
177
    primary_hdu.header["COMMENT"] = (
3✔
178
        f"Normalisation factors computed using class {__name__}"
179
        f" of code stacking")
180
    primary_hdu.header["VERSION"] = (__version__, "Code version")
3✔
181
    primary_hdu.header["DATETIME"] = (now.strftime("%Y-%m-%dT%H:%M:%S"),
3✔
182
                                      "DateTime file created")
183

184
    # norm factors
185
    cols = [
3✔
186
        fits.Column(
187
            name=col, format="J", disp="I4", array=norm_factors[col].values)
188
        if "num pixels" in col or col == "specid" else fits.Column(
189
            name=col, format="E", disp="F7.3", array=norm_factors[col].values)
190
        for col in norm_factors.columns
191
    ]
192
    hdu = fits.BinTableHDU.from_columns(cols, name="NORM_FACTORS")
3✔
193
    # TODO: add description of columns
194

195
    # intervals used
196
    cols = [
3✔
197
        fits.Column(name="START",
198
                    format="E",
199
                    disp="F7.3",
200
                    array=intervals[:, 0]),
201
        fits.Column(name="END", format="E", disp="F7.3", array=intervals[:, 1]),
202
    ]
203
    hdu2 = fits.BinTableHDU.from_columns(cols, name="NORM_INTERVALS")
3✔
204
    # TODO: add description of columns
205

206
    # correction factors
207
    cols = [
3✔
208
        fits.Column(name="CORRECTION_FACTOR",
209
                    format="E",
210
                    disp="F7.3",
211
                    array=correction_factors),
212
        fits.Column(name="INTERVAL",
213
                    format="J",
214
                    disp="I4",
215
                    array=np.arange(correction_factors.size, dtype=int)),
216
    ]
217
    hdu3 = fits.BinTableHDU.from_columns(cols, name="CORRECTION_FACTORS")
3✔
218
    # TODO: add description of columns
219

220
    hdul = fits.HDUList([primary_hdu, hdu, hdu2, hdu3])
3✔
221
    hdul.writeto(filename, overwrite=True, checksum=True)
3✔
222

223

224
def save_norm_intervals_ascii(filename, intervals):
3✔
225
    """ Save the normalisation intervals in an ascii file
226

227
    Arguments
228
    ---------
229
    filename: str
230
    Name of the file
231

232
    intervals:  array of (float, float)
233
    Array containing the selected intervals. Each item must contain
234
    two floats signaling the starting and ending wavelength of the interval.
235
    Naturally, the starting wavelength must be smaller than the ending wavelength.
236
    """
237
    with open(filename, "w", encoding="utf-8") as results:
3✔
238
        results.write("# start end\n")
3✔
239
        for index in range(intervals.shape[0]):
3✔
240
            results.write(f"{intervals[index, 0]} {intervals[index, 1]}\n")
3✔
241

242

243
def select_final_normalisation_factor(row, correction_factors, min_norm_sn):
3✔
244
    """ Select the final normalisation factor
245

246
    This function should be called using pd.apply with axis=1
247

248
    Arguments
249
    ---------
250
    row: array
251
    A dataframe row with the normalisation factors. Should contain the columns
252
    'norm factor X', 'norm S/N X', 'num pixels X', where X = 0, 1, ... N and
253
    N is the size of intervals_corretion_factors
254

255
    correction_factors: array of float
256
    Correction factors to correct the chosen normalisation factors
257

258
    min_norm_sn: float
259
    Minimum signal to noise in the normalization interval. Intervals with lower
260
    signal to noise values are not selected and nans are returned
261

262
    Return
263
    ------
264
    norm_factor: float
265
    The selected norm factor. NaN if no valid normalization factor is found
266

267
    norm_sn: float
268
    The selected norm factor. NaN if no valid normalization factor is found
269

270
    selected_interval: int
271
    The selected interval. -1 if no valid interval was found
272
    """
273
    # select best interval
274
    cols = [f"norm S/N {index}" for index in range(correction_factors.size)]
3✔
275
    try:
3✔
276
        selected_interval = np.nanargmax(row[cols].values)
3✔
277
        norm_factor = row[
3✔
278
            f"norm factor {selected_interval}"] * correction_factors[
279
                selected_interval]
280
        norm_sn = row[f"norm S/N {selected_interval}"]
3✔
281
        if norm_sn < min_norm_sn:
3!
UNCOV
282
            norm_factor = np.nan
×
UNCOV
283
            norm_sn = np.nan
×
UNCOV
284
            selected_interval = -1
×
285

286
    except ValueError:
3✔
287
        norm_factor = np.nan
3✔
288
        norm_sn = np.nan
3✔
289
        selected_interval = -1
3✔
290

291
    return norm_factor, norm_sn, selected_interval
3✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc