• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

aburrell / pyIntensityFeatures / 14085128439

26 Mar 2025 01:57PM UTC coverage: 98.863%. First build
14085128439

Pull #1

github

aburrell
TST: added RTD yaml

Added a yaml for readthedocs and updated the Python version for doc and PyPi testing.
Pull Request #1: RC Version 0.1.0

103 of 105 new or added lines in 11 files covered. (98.1%)

2609 of 2639 relevant lines covered (98.86%)

10.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.73
/pyIntensityFeatures/proc/fitting.py
1
#!/usr/bin/env python
2
# -*- coding: utf-8 -*-
3
#
4
# DISTRIBUTION STATEMENT A: Approved for public release. Distribution is
5
# unlimited.
6
# -----------------------------------------------------------------------------
7
"""Functions used for fitting functions to intensity data."""
8

9
import numpy as np
11✔
10
from scipy.optimize import leastsq
11✔
11
from scipy import stats
11✔
12

13
from pyIntensityFeatures.utils import checks
11✔
14
from pyIntensityFeatures.utils import distributions
11✔
15

16

17
def estimate_peak_widths(data, data_loc, peak_inds, peak_mags):
11✔
18
    """Estimate the full width at half max of the peaks in a curve.
19

20
    Parameters
21
    ----------
22
    data : array-like
23
        Input data (y-axis)
24
    data_loc : array-like
25
        Input data location (x-axis)
26
    peak_inds : array-like
27
        Indexes of the peaks
28
    peak_mags : array-like
29
        Magnitude of the peaks
30

31
    Returns
32
    -------
33
    peak_sigmas : list-like
34
        FWHM values for each peak, or the closest estimate possible
35

36
    Notes
37
    -----
38
    Differs from Longden, et al. (2010) function by not assuming a fixed data
39
    location increment of 1.0 degrees.
40

41
    """
42
    # Ensure the input is array-like
43
    data = np.asarray(data)
11✔
44
    data_loc = np.asarray(data_loc)
11✔
45
    peak_inds = np.asarray(peak_inds)
11✔
46
    peak_mags = np.asarray(peak_mags)
11✔
47

48
    # Get the half-max for each peak
49
    half_max = 0.5 * peak_mags
11✔
50
    peak_sigmas = list()
11✔
51

52
    for i, ipeak in enumerate(peak_inds):
11✔
53
        # Step through all values of the curve data (`data`) with lower indices
54
        # than this peak until either the half-max threshold or another peak is
55
        # reached
56
        reached = False
11✔
57
        good_down = False
11✔
58
        j = ipeak
11✔
59
        while not reached and j > 0:
11✔
60
            if (data[j] >= half_max[i]) and (data[j - 1] <= half_max[i]):
11✔
61
                reached = True
11✔
62
                fwhm_loc = data_loc[j - 1] + (
11✔
63
                    data_loc[j] - data_loc[j - 1]) * (
64
                        half_max[i] - data[j - 1]) / (data[j] - data[j - 1])
65
                width_down = data_loc[ipeak] - fwhm_loc
11✔
66
                good_down = True
11✔
67
            else:
68
                if data[j] < data[j - 1]:
11✔
69
                    # Data is no longer decreasing. Since the correct width was
70
                    # not reached before finding the desired half-max value,
71
                    # set an approximate value
72
                    reached = True
11✔
73
                    width_down = data_loc[ipeak] - data_loc[j - 1]
11✔
74
            j -= 1
11✔
75

76
        if not good_down and not reached:
11✔
77
            # Reached edge of data before defining the width
78
            width_down = data_loc[ipeak] - data_loc[j]
11✔
79

80
        # Step through all values of the curve data (`data`) with higher
81
        # indices than this peak until either the half-max threshold or another
82
        # peak is reached
83
        reached = False
11✔
84
        good_up = False
11✔
85
        j = ipeak
11✔
86
        while not reached and j < len(data) - 1:
11✔
87
            if (data[j] >= half_max[i]) and (data[j + 1] <= half_max[i]):
11✔
88
                reached = True
11✔
89
                fwhm_loc = data_loc[j + 1] + (
11✔
90
                    data_loc[j] - data_loc[j + 1]) * (
91
                        half_max[i] - data[j + 1]) / (data[j] - data[j + 1])
92
                width_up = fwhm_loc - data_loc[ipeak]
11✔
93
                good_up = True
11✔
94
            else:
95
                if data[j] < data[j + 1]:
11✔
96
                    # Data is no longer decreasing. Since the correct width was
97
                    # not reached before finding the desired half-max value,
98
                    # set an approximate value
99
                    reached = True
11✔
100
                    width_up = data_loc[j + 1] - data_loc[ipeak]
11✔
101
            j += 1
11✔
102

103
        if not good_up and not reached:
11✔
104
            # Reached edge of data before defining the width
105
            width_up = data_loc[j] - data_loc[ipeak]
11✔
106

107
        # Calculate the peak sigmas using logic based on whether or not the
108
        # HWHM was found on each leg
109
        if good_up and good_down:
11✔
110
            peak_sigmas.append((width_up + width_down) / checks.fwhm_const)
11✔
111
        elif good_up:
11✔
112
            peak_sigmas.append((2.0 * width_up) / checks.fwhm_const)
11✔
113
        elif good_down:
11✔
114
            peak_sigmas.append((2.0 * width_down) / checks.fwhm_const)
11✔
115
        else:
116
            peak_sigmas.append(2.0 * min([width_up, width_down])
11✔
117
                               / checks.fwhm_const)
118

119
    return peak_sigmas
11✔
120

121

122
def gauss_quad_err(params, xvals, yvals, weights):
11✔
123
    """Calculate the error of a quadriatic Gaussian fit.
124

125
    Parameters
126
    ----------
127
    params : set
128
        Set of parameters used by `mult_gauss_quad`
129
    xvals : float or array-like
130
        Location values
131
    yvals : float or array-like
132
        Intensity values
133
    weights : float or array-like
134
        Weights for each data value
135

136
    Returns
137
    -------
138
    err : float or array-like
139
        Weighted error of the fit defined by `params`
140

141
    See Also
142
    --------
143
    utils.distributions.mult_gauss_quad
144

145
    """
146
    # Define the error function to fit
147
    err = (distributions.mult_gauss_quad(xvals, params) - yvals) * weights
11✔
148

149
    return err
11✔
150

151

152
def get_fitting_params(mlat_bins, ilats, ilt, mean_intensity, num_gauss=3):
11✔
153
    """Find the parameters for a Gaussian fit with a quadratic background.
154

155
    Parameters
156
    ----------
157
    mlat_bins : np.array
158
        Magnetic latitude of output bin centres
159
    ilats : np.array
160
        Indices for magnetic latitudes with valid intensity values
161
    ilt : int
162
        Index for the magnetic local time
163
    mean_intensity : np.array
164
       2D array of mean intensity values
165
    num_gauss : int
166
        Maximum number of Gaussians to fit (default=3)
167

168
    Returns
169
    -------
170
    params : list
171
        List of the parameters needed to form a mult-Gaussian fit with a
172
        quadratic background.
173
    ipeaks : list
174
        List containing indexes of the normalized intensity peaks for the
175
        2D `mean_intesity` array down-selected by [`ilats`, `ilt`].
176

177
    """
178
    # Perform a first-order polynomial fit to the data to obtain estimates for
179
    # the background levels in the intensity profile. The first term is the
180
    # slope and the second term is the intercept.
181
    bg = np.polyfit(mlat_bins[ilats], mean_intensity[ilats, ilt], 1)
11✔
182

183
    # Normalize the intensity curve
184
    norm_intensity = (mean_intensity[ilats, ilt]
11✔
185
                      - mean_intensity[ilats, ilt].min()) / (
186
                          mean_intensity[ilats, ilt].max()
187
                          - mean_intensity[ilats, ilt].min())
188

189
    # Find the main peak and its characteristics
190
    ipeaks = [norm_intensity.argmax()]
11✔
191
    peak_sigmas = estimate_peak_widths(norm_intensity, mlat_bins[ilats], ipeaks,
11✔
192
                                       [norm_intensity[ipeaks[0]]])
193

194
    # Locate any additional peaks, up to a desired maximum
195
    mlt_gauss = num_gauss
11✔
196
    while len(ipeaks) < mlt_gauss:
11✔
197
        # Get a Gaussian for the prior peak
198
        prior_gauss = distributions.gauss(
11✔
199
            mlat_bins[ilats], norm_intensity[ipeaks[-1]],
200
            mlat_bins[ilats][ipeaks[-1]], peak_sigmas[-1], 0.0)
201

202
        # Remove the gaussian fit from the data
203
        norm_intensity -= prior_gauss
11✔
204

205
        # Get the next peak
206
        inew = norm_intensity.argmax()
11✔
207

208
        # Test to see if the new peak is significant
209
        if not np.any((mlat_bins[ilats][inew] > mlat_bins[ilats][ipeaks]
11✔
210
                       - np.asarray(peak_sigmas) * 2.0)
211
                      & (mlat_bins[ilats][inew] < mlat_bins[ilats][ipeaks]
212
                         + np.asarray(peak_sigmas) * 2.0)):
213
            ipeaks.append(inew)
11✔
214
            peak_sigmas.extend(estimate_peak_widths(
11✔
215
                norm_intensity, mlat_bins[ilats], [ipeaks[-1]],
216
                [norm_intensity[ipeaks[-1]]]))
217
        else:
218
            mlt_gauss = len(ipeaks)
11✔
219

220
    # Use the peak information to set the Gaussian fitting parameters
221
    params = [bg[1], bg[0], 0.0]
11✔
222
    for i, ipeak in enumerate(ipeaks):
11✔
223
        params.extend([mean_intensity[ilats, ilt][ipeak],
11✔
224
                       mlat_bins[ilats][ipeak], peak_sigmas[i]])
225

226
    return params, ipeaks
11✔
227

228

229
def get_gaussian_func_fit(mlat_bins, mlt_bins, mean_intensity, std_intensity,
11✔
230
                          num_intensity, num_gauss=3, min_num=3,
231
                          min_intensity=0.0, min_lat_perc=70.0):
232
    """Fit intensity data using least-squares minimization in MLT bins.
233

234
    Parameters
235
    ----------
236
    mlat_bins : np.array
237
        Magnetic latitude of output bin centres
238
    mlt_bins : np.array
239
        Magnetic local time of output bin centres
240
    mean_intensity : np.array
241
       2D array of mean intensity values
242
    std_intensity : np.array
243
       2D array of intensity standard deviations
244
    num_intensity : np.array
245
       2D array of intensity counts per bin
246
    num_gauss : int
247
        Maximum number of Gaussians to fit (default=3)
248
    min_num : int
249
        Minimum number of samples contributing to the intensity mean
250
        (default=3)
251
    min_intensity : float
252
        Minimum intensity magnitude in Rayleighs (default=0.0)
253
    min_lat_perc : int
254
        Minimum percentage of latitude bins needed to define an intensity
255
        function (default=70.0)
256

257
    Returns
258
    -------
259
    func_params : list-like
260
        Set of parameters used by `mult_gauss_quad` for each MLT bin that
261
        defines the mean intensity as a function of magnetic latitude or set
262
        to a string output with a reason if no successful fit was performed.
263
    fit_cov : list-like
264
        Covariance matrix for each fit
265
    fit_pearsonr : list-like
266
        Pearson r-value for correlation between the observations and the fit
267
    fit_pearsonp : list-like
268
        Pearson p-value for correlation between the observations and the fit
269
    num_peaks : list-like
270
        Number of peaks used in the fit
271

272
    See Also
273
    --------
274
    utils.distributions.mult_gauss_quad
275

276
    """
277
    # Initialize the output
278
    func_params = list()
11✔
279
    fit_cov = list()
11✔
280
    fit_pearsonr = list()
11✔
281
    fit_pearsonp = list()
11✔
282
    num_peaks = list()
11✔
283

284
    # Get the percentage multiplier
285
    perc_mult = 100.0 / len(mlat_bins)
11✔
286

287
    # Loop through each MLT bin
288
    for ilt, mlt in enumerate(mlt_bins):
11✔
289
        ilats = np.where(np.isfinite(mean_intensity[:, ilt])
11✔
290
                         & (num_intensity[:, ilt] >= min_num)
291
                         & (mean_intensity[:, ilt] >= min_intensity))[0]
292

293
        if len(ilats) * perc_mult >= min_lat_perc:
11✔
294
            # Get the peak information to set the Gaussian fitting parameters
295
            params, ipeaks = get_fitting_params(mlat_bins, ilats, ilt,
11✔
296
                                                mean_intensity, num_gauss)
297

298
            # Find the desired Gaussian + Quadratic fit using least-squares
299
            # if there are enough latitudes to provide a fit
300
            lsq_args = (mlat_bins[ilats], mean_intensity[ilats, ilt],
11✔
301
                        1.0 / std_intensity[ilats, ilt])
302
            num_peaks.append(len(ipeaks))
11✔
303
            if len(params) <= len(ilats):
11✔
304
                lsq_result = leastsq(gauss_quad_err, params, args=lsq_args,
11✔
305
                                     full_output=True)
306

307
                # Evaluate the least squares output and save the results
308
                if lsq_result[-1] in [1, 2, 3, 4]:
11✔
309
                    gauss_out = distributions.mult_gauss_quad(
11✔
310
                        mlat_bins[ilats], lsq_result[0])
311
                    fmask = np.isfinite(gauss_out)
11✔
312
                    if fmask.sum() > 3:
11✔
313
                        pres = stats.pearsonr(
11✔
314
                            mean_intensity[ilats, ilt][fmask],
315
                            gauss_out[fmask])
316
                        func_params.append(lsq_result[0])
11✔
317

318
                        # As of scipy 1.11.0 the output changed
319
                        if hasattr(pres, 'statistic'):
11✔
320
                            fit_pearsonr.append(pres.statistic)
11✔
321
                            fit_pearsonp.append(pres.pvalue)
11✔
322
                        else:
NEW
323
                            fit_pearsonr.append(pres[0])
×
NEW
324
                            fit_pearsonp.append(pres[1])
×
325

326
                        fit_cov.append(lsq_result[1])
11✔
327
                        found_fit = True
11✔
328
                    else:
329
                        found_fit = False
×
330
                else:
331
                    found_fit = False
11✔
332
            else:
333
                found_fit = False
11✔
334
                lsq_result = ["insufficient mlat coverage for expected peaks",
11✔
335
                              -1]
336

337
            if not found_fit:
11✔
338
                while not found_fit and len(ipeaks) > 1:
11✔
339
                    # Try reducing the number of peaks to obtain a valid fit
340
                    ipeaks.pop()
11✔
341
                    params = list(np.array(params)[:-3])
11✔
342

343
                    if len(params) <= len(ilats):
11✔
344
                        lsq_result = leastsq(gauss_quad_err, params,
11✔
345
                                             args=lsq_args, full_output=True)
346
                        num_peaks[-1] = len(ipeaks)
11✔
347
                        if lsq_result[-1] in [1, 2, 3, 4]:
11✔
348
                            gauss_out = distributions.mult_gauss_quad(
11✔
349
                                mlat_bins[ilats], lsq_result[0])
350
                            gmask = np.isfinite(gauss_out)
11✔
351

352
                            if sum(gmask) > 3:
11✔
353
                                pres = stats.pearsonr(
11✔
354
                                    mean_intensity[ilats, ilt][gmask],
355
                                    gauss_out[gmask])
356
                                found_fit = True
11✔
357
                                func_params.append(lsq_result[0])
11✔
358
                                fit_pearsonr.append(pres.statistic)
11✔
359
                                fit_pearsonp.append(pres.pvalue)
11✔
360
                                fit_cov.append(lsq_result[1])
11✔
361
                if not found_fit:
11✔
362
                    func_params.append(lsq_result[-2])
11✔
363
                    fit_cov.append(None)
11✔
364
                    fit_pearsonr.append(None)
11✔
365
                    fit_pearsonp.append(None)
11✔
366
        else:
367
            func_params.append("insufficient mlat coverage ({:} < {:})".format(
11✔
368
                len(ilats), min_lat_perc / perc_mult))
369
            fit_cov.append(None)
11✔
370
            fit_pearsonr.append(None)
11✔
371
            fit_pearsonp.append(None)
11✔
372
            num_peaks.append(0)
11✔
373

374
    return func_params, fit_cov, fit_pearsonr, fit_pearsonp, num_peaks
11✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc