• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Keck-DataReductionPipelines / KPF-Pipeline / #1526

15 Dec 2025 06:00PM UTC coverage: 42.274% (+0.006%) from 42.268%
#1526

push

coveralls-python

web-flow
Merge branch 'develop' into debug-thar-l1

2 of 2 new or added lines in 1 file covered. (100.0%)

1 existing line in 1 file now uncovered.

11526 of 27265 relevant lines covered (42.27%)

0.42 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

71.56
/modules/stray_light/src/alg.py
1
import warnings
2
from datetime import datetime, timedelta
1✔
3

4
import astropy.constants as apc
1✔
5
from astropy.stats import mad_std
1✔
6
from copy import deepcopy
1✔
7
import matplotlib.pyplot as plt
1✔
8
import numpy as np
1✔
9
from numpy.polynomial import polynomial as poly
1✔
10
from numpy.polynomial.legendre import legval
1✔
11
import pandas as pd
1✔
12
from scipy.ndimage import median_filter, gaussian_filter, distance_transform_edt
1✔
13
from scipy.interpolate import LSQUnivariateSpline, CubicSpline
1✔
14

15
from kpfpipe.config.pipeline_config import ConfigClass
1✔
16
from kpfpipe.logger import start_logger
1✔
17
from modules.Utils.config_parser import ConfigHandler
1✔
18

19
class StrayLightAlg:
1✔
20
    """
21
    This module defines 'StrayLight' and methods to perform stray (scattered) light estimation from inter-order pixels.
22

23
    Args:
24
        target_2D (KPF0): A KPF 2D science object
25
        master_order_mask (KPF0): a KPF master order mask
26
        config (configparser.ConfigParser): Config context
27
        logger (logging.Logger): Instance of logging.Logger
28
    
29
    Attributes:
30
        rawimage (np.ndarray): From parameter 'rawimage'.
31

32
    """
33
    def __init__(self, 
1✔
34
                 target_2D, 
35
                 master_order_mask,
36
                 default_config_path,
37
                 logger=None
38
                ):
39
        """
40
        Inits StrayLight class with raw data, order mask, config, logger.
41
        
42
        Args:
43
            target_2D (KPF0): A KPF 2D science object
44
            master_order_mask (KPF0): a KPF master order mask
45
            config (configparser.ConfigParser): Config context
46
            logger (logging.Logger): Instance of logging.Logger
47
        """
48

49
        # Input arguments
50
        self.config = ConfigClass(default_config_path)
1✔
51
        if logger == None:
1✔
52
            self.log = start_logger('StrayLight', default_config_path)
1✔
53
        else:
54
            self.log = logger
×
55
            
56
        cfg_params = ConfigHandler(self.config, 'PARAM')
1✔
57
        self.method = str(cfg_params.get_config_value('method'))
1✔
58
        self.polyorder = int(cfg_params.get_config_value('polyorder'))
1✔
59
        try:
1✔
60
            self.regularize = float(cfg_params.get_config_value('method'))
1✔
61
        except ValueError:
1✔
62
            self.regularize = str(cfg_params.get_config_value('regularize'))
1✔
63
        self.edge_clip = int(cfg_params.get_config_value('edge_clip'))
1✔
64
        self.mask_buffer = int(cfg_params.get_config_value('mask_buffer'))
1✔
65

66
        self.target_2D = target_2D        
1✔
67
        self.master_order_mask = master_order_mask
1✔
68
        self.drptag = self.target_2D.header['PRIMARY']['DRPTAG']
1✔
69

70
    
71
    def add_keywords(self, stray_light_image, inter_order_mask, chip):
1✔
72
        """
73
        Adds keywords to track basic stray light statistics (mean, rms, min, max)
74
        """
75
        header = self.target_2D.header['PRIMARY']
1✔
76

77
        if self.method == 'polynomial':
1✔
78
            method = f"{self.method}_{self.polyorder}"
1✔
79
        else:
80
            method = self.method
×
81

82
        if chip == 'GREEN':
1✔
83
            slg = stray_light_image[~inter_order_mask]
1✔
84

85
            header['SLGMETH'] = method            # COMMENT method used to estimate stray light for GREEN
1✔
86
            header['SLGMEAN'] = np.mean(slg)      # COMMENT mean of GREEN inter-order stray light
1✔
87
            header['SLGRMS']  = np.std(slg)       # COMMENT standard deviation of GREEN inter-order stray light
1✔
88
            header['SLGMIN']  = np.min(slg)       # COMMENT minimum of GREEN inter-order stray light
1✔
89
            header['SLGMAX']  = np.max(slg)       # COMMENT maximum of GREEN inter-order stray light
1✔
90

91
        elif chip == 'RED':
1✔
92
            slr = stray_light_image[~inter_order_mask]
1✔
93

94
            header['SLRMETH'] = method            # COMMENT method used to estimate stray light for RED
1✔
95
            header['SLRMEAN'] = np.mean(slr)      # COMMENT mean of RED inter-order stray light
1✔
96
            header['SLRRMS']  = np.std(slr)       # COMMENT standard deviation of RED inter-order stray light
1✔
97
            header['SLRMIN']  = np.min(slr)       # COMMENT minimum of RED inter-order stray light
1✔
98
            header['SLRMAX']  = np.max(slr)       # COMMENT maximum of RED inter-order stray light
1✔
99

100

101
    def remove_stray_light(self, 
1✔
102
                             chip, 
103
                             method=None, 
104
                             polyorder=None, 
105
                             regularize=None,
106
                             edge_clip=None, 
107
                             mask_buffer=None,
108
                             return_model=False
109
                             ):
110
        """
111
        Main method used to estimate and remove stray light
112
        Calls method defined in config file; allowed methods are 'zero', 'mean', and 'polynomial'
113

114
        Args:
115
            chip (str) : 'GREEN' or 'RED' to select which CCD to fit
116
            method (str) : fitting method, allowed methods are 'zero', 'mean' and 'polynomial'
117
            polyorder (int) : order of polynomial
118
            regularize : can be 'none', 'auto', or a positive float (lambda parameter for T2-ridge regression)
119
            edge_clip (int) : number of pixels to ignore on each edge (default=0)
120
            mask_buffer (int): illuminated mask region will be widened by N=mask_buffer pixels
121
            return_model (bool): True to return ndarrays for smooth stray light model and inter-order mask
122
            
123
        Returns:
124
            out_2D (KPF0): level 2D object with stray light removed from CCD image
125
            if return_model=True:
126
                stray_light_image (dict of ndarrays): stray light images for GREEN and RED ccds
127
                inter_order_mask (dict of ndarrays): boolean masks of inter-order pixels for GREEN and RED ccds
128
        """
129
        # set parameters
130
        if method is None:
1✔
131
            method = self.method
1✔
132
        if polyorder is None:
1✔
133
            polyorder = self.polyorder
1✔
134
        if regularize is None:
1✔
135
            regularize = self.regularize
1✔
136
        if edge_clip is None:
1✔
137
            edge_clip = self.edge_clip
1✔
138
        if mask_buffer is None:
1✔
139
            mask_buffer = self.mask_buffer
1✔
140

141
        # select method and estimate stray light
142
        try:
1✔
143
            stray_light_method = self.__getattribute__(method)
1✔
144
        except AttributeError:
×
145
            #self.log.error(f'Stray light method {self.method} not implemented.')
146
            raise(AttributeError)
×
147

148
        stray_light_image, inter_order_mask = stray_light_method(chip, 
1✔
149
                                                                 method=method, 
150
                                                                 polyorder=polyorder, 
151
                                                                 regularize=regularize,
152
                                                                 edge_clip=edge_clip, 
153
                                                                 mask_buffer=mask_buffer
154
                                                                )
155
        
156
        self.add_keywords(stray_light_image, inter_order_mask, chip)
1✔
157

158
        # remove stray light from science image
159
        out_2D = self.target_2D
1✔
160
        out_2D[f'{chip}_CCD'] -= stray_light_image
1✔
161

162
        if return_model:
1✔
163
            return out_2D, stray_light_image, inter_order_mask
×
164

165
        return out_2D
1✔
166

167
    
168
    def zero(self, chip, **kwargs):
1✔
169
        """
170
        Method to estimate stray light -- returns zero (i.e. no stray light)
171
        """
172
        mask = self._inter_order_mask(chip).astype('bool')
×
UNCOV
173
        stray_light = np.zeros_like(self.target_2D[f'{chip}_CCD'])
×
174

175
        return stray_light, mask
×
176

177
    
178
    def mean(self, chip, edge_clip=0, mask_buffer=None, **kwargs):
1✔
179
        """
180
        Method to estimate stray light -- returns mean of inter-order pixels
181
        """
182
        data = np.array(self.target_2D[f'{chip}_CCD'].data)
×
183
        mask = self._inter_order_mask(chip, mask_buffer=mask_buffer).astype('bool')
×
184
        
185
        if edge_clip > 0:
×
186
            d = data[edge_clip:-edge_clip,edge_clip:-edge_clip]
×
187
            m = mask[edge_clip:-edge_clip,edge_clip:-edge_clip]
×
188
        
189
        stray_light = np.mean(d[~m])*np.ones_like(d)
×
190
    
191
        return stray_light, mask
×
192

193
    
194
    def polynomial(self, chip, polyorder, regularize='none', edge_clip=0, mask_buffer=None, **kwargs):
1✔
195
        """
196
        Method to estimate stray light -- fits a 2D polynomial to inter-order pixels
197
        """
198
        data = deepcopy(np.array(self.target_2D[f'{chip}_CCD'].data))
1✔
199
        mask = deepcopy(self._inter_order_mask(chip, mask_buffer=mask_buffer).astype(bool))
1✔
200

201
        if edge_clip > 0:
1✔
202
            edge_mask = np.ones_like(mask)
1✔
203
            edge_mask[edge_clip:-edge_clip,edge_clip:-edge_clip] = 0
1✔
204
            mask |= edge_mask
1✔
205

206
        coeffs = self._polyfit2d(data, polyorder, regularize=regularize, mask=mask)    
1✔
207
        
208
        stray_light = self._polyval2d(coeffs, polyorder, data.shape)
1✔
209
        stray_light = np.maximum(stray_light, 0)
1✔
210
        stray_light += np.nanmedian((data - stray_light)[~mask])
1✔
211
        stray_light = np.maximum(stray_light, 0)
1✔
212

213
        return stray_light, mask
1✔
214

215

216
    def columns(self, chip, polyorder, gaussian_sigma=128, edge_clip=0, mask_buffer=None, **kwargs):
1✔
217
        """
218
        Method to estimate stray light -- fits a 2D polynomial to inter-order pixels
219
        """
220
        data = deepcopy(np.array(self.target_2D[f'{chip}_CCD'].data))
×
221
        mask = deepcopy(self._inter_order_mask(chip, mask_buffer=mask_buffer).astype(bool))
×
222

223
        if edge_clip > 0:
×
224
            edge_mask = np.ones_like(mask)
×
225
            edge_mask[edge_clip:-edge_clip,edge_clip:-edge_clip] = 0
×
226
            mask |= edge_mask
×
227

228
        coeffs = self._polyfit_columns(data, polyorder, mask=mask)
×
229
        stray_light = self._polyval_columns(coeffs, data.shape[0])
×
230
        stray_light = self._patch_nan_nearest(stray_light)
×
231
        stray_light = gaussian_filter(stray_light, gaussian_sigma, mode='reflect', truncate=4.0)
×
232
        stray_light += np.nanmedian((data - stray_light)[~mask])
×
233
        stray_light = np.maximum(stray_light, 0)
×
234

235
        return stray_light, mask
×
236
        
237

238
    def _polyfit2d(self, data_image, polyorder, regularize='none', mask=None):
1✔
239
        # coordinate grid
240
        nrow, ncol = data_image.shape
1✔
241
        y, x = np.mgrid[0:nrow, 0:ncol]
1✔
242

243
        # map to [-1,1] for Legendre polynomial basis
244
        x = 2*x/(ncol-1) - 1
1✔
245
        y = 2*y/(nrow-1) - 1
1✔
246

247
        # ravel arrays
248
        x = np.ravel(x)
1✔
249
        y = np.ravel(y)
1✔
250
        z = np.ravel(data_image)
1✔
251
    
252
        # mask exposed pixels
253
        if mask is not None:
1✔
254
            mask = mask.astype(bool).ravel()
1✔
255
        else:
256
            mask = np.zeros((nrow,ncol), dtype=bool).ravel()
×
257

258
        x = x[~mask]
1✔
259
        y = y[~mask]
1✔
260
        z = z[~mask]
1✔
261
        
262
        # build design matrix
263
        terms = []
1✔
264
        for i in range(polyorder + 1):
1✔
265
            for j in range(polyorder + 1 - i):
1✔
266
                cx = np.zeros(i+1)
1✔
267
                cy = np.zeros(j+1)
1✔
268
                cx[i] = 1
1✔
269
                cy[j] = 1
1✔
270
                Px = legval(x,cx)
1✔
271
                Py = legval(y,cy)
1✔
272
                terms.append(Px*Py)
1✔
273

274
        A = np.column_stack(terms)
1✔
275

276
        # solve least squares
277
        coeffs, _, _, _ = np.linalg.lstsq(A, z, rcond=None)
1✔
278

279
        # apply regularization and recompute coefficients
280
        if isinstance(regularize, float):
1✔
281
            lam = regularize
×
282

283
        elif isinstance(regularize, str):
1✔
284
            if regularize == 'auto':
1✔
285
                lam = np.var(z)/np.var(coeffs)
1✔
286
            elif regularize == 'none':
×
287
                lam = 0
×
288
            else:
289
                raise ValueError("regularize must be 'auto', 'none', or a float value")
×
290

291
        if lam == 0:
1✔
292
            pass
×
293
        elif lam > 0:
1✔
294
            ATz = np.dot(A.T,z)
1✔
295
            ATA = np.dot(A.T,A) + lam*np.eye(A.shape[1])
1✔
296
            coeffs = np.linalg.solve(ATA,ATz)
1✔
297
        elif lam < 0:
×
298
            raise ValueError("regularization parameter lambda must not be negative")
×
299
            
300
        return coeffs
1✔
301
    
302
    
303
    def _polyval2d(self, coeffs, polyorder, shape):
1✔
304
        # coordinate grid
305
        nrow, ncol = shape
1✔
306
        y, x = np.mgrid[0:nrow, 0:ncol]
1✔
307

308
        # map to [-1,1] for Legendre polynomial basis
309
        x = 2*x/(ncol-1) - 1
1✔
310
        y = 2*y/(nrow-1) - 1
1✔
311

312
        # ravel arrays
313
        x = np.ravel(x)
1✔
314
        y = np.ravel(y)
1✔
315

316
        # build polynomial terms
317
        terms = []
1✔
318
        for i in range(polyorder + 1):
1✔
319
            for j in range(polyorder + 1 - i):
1✔
320
                cx = np.zeros(i+1)
1✔
321
                cy = np.zeros(j+1)
1✔
322
                cx[i] = 1
1✔
323
                cy[j] = 1
1✔
324
                Px = legval(x,cx)
1✔
325
                Py = legval(y,cy)
1✔
326
                terms.append(Px*Py)
1✔
327
        terms = np.array(terms)
1✔
328

329
        # calculate fitted polynomial
330
        result = np.dot(coeffs, terms).reshape((nrow,ncol))
1✔
331
    
332
        return result
1✔
333

334

335
    def _polyfit_columns(self, data_image, polyorder, mask=None):
1✔
336
        nrow, ncol = data_image.shape
×
337
        y = np.arange(nrow)
×
338
        
339
        # mask exposed pixels
340
        if mask is not None:
×
341
            mask = mask.astype(bool)
×
342
        else:
343
            mask = np.zeros((nrow,ncol), dtype=bool)
×
344

345
        V = np.vander(y, polyorder + 1, increasing=True)
×
346
        coeffs = np.full((polyorder + 1, ncol), np.nan, dtype=float)
×
347

348
        for j in range(ncol):
×
349
            m = ~mask[:, j]
×
350
            
351
            if np.count_nonzero(m) > polyorder:
×
352
                Vj = V[m, :]
×
353
                yj = data_image[m, j]
×
354
                c, *_ = np.linalg.lstsq(Vj, yj, rcond=None)
×
355
                coeffs[:, j] = c
×
356
        
357
        return coeffs
×
358

359

360
    def _polyval_columns(self, coeffs, nrow):
1✔
361
        ncoeffs, ncol = coeffs.shape
×
362
        y = np.arange(nrow)
×
363
        V = np.vander(y, ncoeffs, increasing=True)
×
364
        
365
        return np.dot(V, coeffs)
×
366

367

368
    def _patch_nan_nearest(self, data_image):
1✔
369
        bad = np.isnan(data_image)
×
370
        
371
        if not np.any(bad):
×
372
            return data_image.copy()
×
373

374
        indices = distance_transform_edt(bad, return_distances=False, return_indices=True)
×
375

376
        return data_image[tuple(indices)]
×
377
    
378
    
379
    def _inter_order_mask(self, chip, dark_fibers=None, mask_buffer=None):
1✔
380
        """
381
        Args:
382
          dark_fibers (list) : list of integers 1-5 indicating any non-illuminated fibers
383
          mask_buffer (int) : number of pixels to expand intra-order region of mask
384
        """
385
        mask = self.master_order_mask[f'{chip}_CCD'] > 0
1✔
386

387
        if dark_fibers is not None:
1✔
388
            for fiber in dark_fibers:
×
389
                mask &= self.master_order_mask[f'{chip}_CCD'] != fiber
×
390

391
        if mask_buffer is not None:
1✔
392
            for i in range(mask_buffer):
1✔
393
                buffer = np.zeros_like(mask)
1✔
394
            
395
                row_diff = mask[:-1,:] != mask[1:,:]
1✔
396
                col_diff = mask[:,:-1] != mask[:,1:]
1✔
397
            
398
                buffer[:-1,:] |= row_diff
1✔
399
                buffer[1:,:] |= row_diff
1✔
400
                buffer[:,:-1] |= col_diff
1✔
401
                buffer[:,1:] |= col_diff
1✔
402
        
403
                mask |= buffer
1✔
404

405
        return mask
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc