• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

sibirrer / lenstronomy / 4875746828

pending completion
4875746828

push

github

Simon Birrer
Merge remote-tracking branch 'origin/main'

18534 of 19151 relevant lines covered (96.78%)

0.97 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

99.1
/lenstronomy/PointSource/point_source.py
1
import numpy as np
1✔
2
import copy
1✔
3
from lenstronomy.PointSource.point_source_cached import PointSourceCached
1✔
4

5
__all__ = ['PointSource']
1✔
6

7
_SUPPORTED_MODELS = ['UNLENSED', 'LENSED_POSITION', 'SOURCE_POSITION']
1✔
8

9

10
class PointSource(object):
1✔
11

12
    def __init__(self, point_source_type_list, lensModel=None, fixed_magnification_list=None,
1✔
13
                 additional_images_list=None, flux_from_point_source_list=None, magnification_limit=None,
14
                 save_cache=False, kwargs_lens_eqn_solver=None):
15
        """
16

17
        :param point_source_type_list: list of point source types
18
        :param lensModel: instance of the LensModel() class
19
        :param fixed_magnification_list: list of booleans (same length as point_source_type_list).
20
         If True, magnification ratio of point sources is fixed to the one given by the lens model.
21
         This option then requires to provide a 'source_amp' amplitude of the source brightness instead of
22
         'point_amp' the list of image brightnesses.
23
        :param additional_images_list: list of booleans (same length as point_source_type_list). If True, search for
24
         additional images of the same source is conducted.
25
        :param flux_from_point_source_list: list of booleans (optional), if set, will only return image positions
26
         (for imaging modeling) for the subset of the point source lists that =True. This option enables to model
27
         imaging data with transient point sources, when the point source positions are measured and present at a
28
         different time than the imaging data
29
        :param magnification_limit: float >0 or None, if float is set and additional images are computed, only those
30
         images will be computed that exceed the lensing magnification (absolute value) limit
31
        :param save_cache: bool, saves image positions and only if delete_cache is executed, a new solution of the lens
32
         equation is conducted with the lens model parameters provided. This can increase the speed as multiple times
33
         the image positions are requested for the same lens model. Attention in usage!
34
        :param kwargs_lens_eqn_solver: keyword arguments specifying the numerical settings for the lens equation solver
35
         see LensEquationSolver() class for details ,such as:
36
         min_distance=0.01, search_window=5, precision_limit=10**(-10), num_iter_max=100
37
        """
38
        self._lensModel = lensModel
1✔
39
        self.point_source_type_list = point_source_type_list
1✔
40
        self._point_source_list = []
1✔
41
        if fixed_magnification_list is None:
1✔
42
            fixed_magnification_list = [False] * len(point_source_type_list)
1✔
43
        self._fixed_magnification_list = fixed_magnification_list
1✔
44
        if additional_images_list is None:
1✔
45
            additional_images_list = [False] * len(point_source_type_list)
1✔
46
        if flux_from_point_source_list is None:
1✔
47
            flux_from_point_source_list = [True] * len(point_source_type_list)
1✔
48
        self._flux_from_point_source_list = flux_from_point_source_list
1✔
49
        for i, model in enumerate(point_source_type_list):
1✔
50
            if model == 'UNLENSED':
1✔
51
                from lenstronomy.PointSource.Types.unlensed import Unlensed
1✔
52
                self._point_source_list.append(PointSourceCached(Unlensed(), save_cache=save_cache))
1✔
53
            elif model == 'LENSED_POSITION':
1✔
54
                from lenstronomy.PointSource.Types.lensed_position import LensedPositions
1✔
55
                self._point_source_list.append(PointSourceCached(LensedPositions(lensModel, fixed_magnification=fixed_magnification_list[i],
1✔
56
                                                                                 additional_images=additional_images_list[i]),
57
                                                                 save_cache=save_cache))
58
            elif model == 'SOURCE_POSITION':
1✔
59
                from lenstronomy.PointSource.Types.source_position import SourcePositions
1✔
60
                self._point_source_list.append(PointSourceCached(SourcePositions(lensModel,
1✔
61
                                                                 fixed_magnification=fixed_magnification_list[i]),
62
                                                                 save_cache=save_cache))
63
            else:
64
                raise ValueError("Point-source model %s not available. Supported models are %s ."
1✔
65
                                 % (model, _SUPPORTED_MODELS))
66
        if kwargs_lens_eqn_solver is None:
1✔
67
            kwargs_lens_eqn_solver = {}
1✔
68
        self._kwargs_lens_eqn_solver = kwargs_lens_eqn_solver
1✔
69
        self._magnification_limit = magnification_limit
1✔
70
        self._save_cache = save_cache
1✔
71

72
    def update_search_window(self, search_window, x_center, y_center, min_distance=None, only_from_unspecified=False):
1✔
73
        """
74
        update the search area for the lens equation solver
75

76
        :param search_window: search_window: window size of the image position search with the lens equation solver.
77
        :param x_center: center of search window
78
        :param y_center: center of search window
79
        :param min_distance: minimum search distance
80
        :param only_from_unspecified: bool, if True, only sets keywords that previously have not been set
81
        :return: updated self instances
82
        """
83
        if min_distance is not None and 'min_distance' not in self._kwargs_lens_eqn_solver and only_from_unspecified:
1✔
84
            self._kwargs_lens_eqn_solver['min_distance'] = min_distance
1✔
85
        if only_from_unspecified:
1✔
86
            self._kwargs_lens_eqn_solver['search_window'] = self._kwargs_lens_eqn_solver.get('search_window',
1✔
87
                                                                                             search_window)
88
            self._kwargs_lens_eqn_solver['x_center'] = self._kwargs_lens_eqn_solver.get('x_center', x_center)
1✔
89
            self._kwargs_lens_eqn_solver['y_center'] = self._kwargs_lens_eqn_solver.get('y_center', y_center)
1✔
90
        else:
91
            self._kwargs_lens_eqn_solver['search_window'] = search_window
1✔
92
            self._kwargs_lens_eqn_solver['x_center'] = x_center
1✔
93
            self._kwargs_lens_eqn_solver['y_center'] = y_center
1✔
94

95
    def update_lens_model(self, lens_model_class):
1✔
96
        """
97

98
        :param lens_model_class: instance of LensModel class
99
        :return: update instance of lens model class
100
        """
101
        self.delete_lens_model_cache()
1✔
102
        self._lensModel = lens_model_class
1✔
103
        for model in self._point_source_list:
1✔
104
            model.update_lens_model(lens_model_class=lens_model_class)
1✔
105

106
    def delete_lens_model_cache(self):
1✔
107
        """
108
        deletes the variables saved for a specific lens model
109

110
        :return: None
111
        """
112
        for model in self._point_source_list:
1✔
113
            model.delete_lens_model_cache()
1✔
114

115
    def set_save_cache(self, save_cache):
1✔
116
        """
117
        set the save cache boolean to new value
118

119
        :param save_cache: bool, if True, saves (or uses a previously saved) values
120
        :return: updated class and sub-class instances to either save or not save the point source information in cache
121
        """
122
        self._set_save_cache(save_cache)
1✔
123
        self._save_cache = save_cache
1✔
124

125
    def _set_save_cache(self, save_cache):
1✔
126
        """
127
        set the save cache boolean to new value. This function is for use within this class for temporarily set the
128
        cache within a single routine.
129

130
        :param save_cache: bool, if True, saves (or uses a previously saved) values
131
        :return: None
132
        """
133
        for model in self._point_source_list:
1✔
134
            model.set_save_cache(save_cache)
1✔
135

136
    def source_position(self, kwargs_ps, kwargs_lens):
1✔
137
        """
138
        intrinsic source positions of the point sources
139

140
        :param kwargs_ps: keyword argument list of point source models
141
        :param kwargs_lens: keyword argument list of lens models
142
        :return: list of source positions for each point source model
143
        """
144
        x_source_list = []
1✔
145
        y_source_list = []
1✔
146
        for i, model in enumerate(self._point_source_list):
1✔
147
            kwargs = kwargs_ps[i]
1✔
148
            x_source, y_source = model.source_position(kwargs, kwargs_lens)
1✔
149
            x_source_list.append(x_source)
1✔
150
            y_source_list.append(y_source)
1✔
151
        return x_source_list, y_source_list
1✔
152

153
    def image_position(self, kwargs_ps, kwargs_lens, k=None, original_position=False, additional_images=False):
1✔
154
        """
155
        image positions as observed on the sky of the point sources
156

157
        :param kwargs_ps: point source parameter keyword argument list
158
        :param kwargs_lens: lens model keyword argument list
159
        :param k: None, int or boolean list; only returns a subset of the model predictions
160
        :param original_position: boolean (only applies to 'LENSED_POSITION' models), returns the image positions in
161
         the model parameters and does not re-compute images (which might be differently ordered) in case of the lens
162
         equation solver
163
        :param additional_images: if True, solves the lens equation for additional images
164
        :type additional_images: bool
165
        :return: list of: list of image positions per point source model component
166
        """
167
        x_image_list = []
1✔
168
        y_image_list = []
1✔
169
        for i, model in enumerate(self._point_source_list):
1✔
170
            if k is None or k == i:
1✔
171
                kwargs = kwargs_ps[i]
1✔
172
                x_image, y_image = model.image_position(kwargs, kwargs_lens,
1✔
173
                                                        magnification_limit=self._magnification_limit,
174
                                                        kwargs_lens_eqn_solver=self._kwargs_lens_eqn_solver,
175
                                                        additional_images=additional_images)
176
                # this takes action when new images are computed not necessary in order
177
                if original_position is True and additional_images is True and\
1✔
178
                        self.point_source_type_list[i] == 'LENSED_POSITION':
179
                    x_o, y_o = kwargs['ra_image'], kwargs['dec_image']
×
180
                    x_image, y_image = _sort_position_by_original(x_o, y_o, x_image, y_image)
×
181

182
                x_image_list.append(x_image)
1✔
183
                y_image_list.append(y_image)
1✔
184
        return x_image_list, y_image_list
1✔
185

186
    def point_source_list(self, kwargs_ps, kwargs_lens, k=None, with_amp=True):
1✔
187
        """
188
        returns the coordinates and amplitudes of all point sources in a single array
189

190
        :param kwargs_ps: point source keyword argument list
191
        :param kwargs_lens: lens model keyword argument list
192
        :param k: None, int or list of int's to select a subset of the point source models in the return
193
        :param with_amp: bool, if False, ignores the amplitude parameters in the return and instead provides ones for
194
         each point source image
195
        :return: ra_array, dec_array, amp_array
196
        """
197
        # here we save the cache of the individual models but do not overwrite the class boolean variable to do so
198
        self._set_save_cache(True)
1✔
199
        # we make sure we do not re-compute the image positions twice when evaluating position and their amplitudes
200
        ra_list, dec_list = self.image_position(kwargs_ps, kwargs_lens, k=k)
1✔
201
        if with_amp is True:
1✔
202
            amp_list = self.image_amplitude(kwargs_ps, kwargs_lens, k=k)
1✔
203
        else:
204
            amp_list = np.ones_like(ra_list)
1✔
205

206
        # here we delete the individual modeling caches in case this was the option
207
        if self._save_cache is False:
1✔
208
            self.delete_lens_model_cache()
1✔
209
            self._set_save_cache(self._save_cache)
1✔
210

211
        ra_array, dec_array, amp_array = [], [], []
1✔
212
        for i, ra in enumerate(ra_list):
1✔
213
            for j in range(len(ra)):
1✔
214
                ra_array.append(ra_list[i][j])
1✔
215
                dec_array.append(dec_list[i][j])
1✔
216
                amp_array.append(amp_list[i][j])
1✔
217
        return ra_array, dec_array, amp_array
1✔
218

219
    def num_basis(self, kwargs_ps, kwargs_lens):
1✔
220
        """
221
        number of basis functions for linear inversion
222

223
        :param kwargs_ps: point source keyword argument list
224
        :param kwargs_lens: lens model keyword argument list
225
        :return: int
226
        """
227
        n = 0
1✔
228
        ra_pos_list, dec_pos_list = self.image_position(kwargs_ps, kwargs_lens)
1✔
229
        for i, model in enumerate(self.point_source_type_list):
1✔
230
            if self._flux_from_point_source_list[i]:
1✔
231
                if self._fixed_magnification_list[i]:
1✔
232
                    n += 1
1✔
233
                else:
234
                    n += len(ra_pos_list[i])
1✔
235
        return n
1✔
236

237
    def image_amplitude(self, kwargs_ps, kwargs_lens, k=None):
1✔
238
        """
239
        returns the image amplitudes
240

241
        :param kwargs_ps: point source keyword argument list
242
        :param kwargs_lens: lens model keyword argument list
243
        :param k: None, int or list of int's to select a subset of the point source models in the return
244
        :return: list of image amplitudes per model component
245
        """
246
        amp_list = []
1✔
247
        for i, model in enumerate(self._point_source_list):
1✔
248
            if (k is None or k == i) and self._flux_from_point_source_list[i]:
1✔
249
                amp_list.append(model.image_amplitude(kwargs_ps=kwargs_ps[i], kwargs_lens=kwargs_lens,
1✔
250
                                                      kwargs_lens_eqn_solver=self._kwargs_lens_eqn_solver))
251
        return amp_list
1✔
252

253
    def source_amplitude(self, kwargs_ps, kwargs_lens):
1✔
254
        """
255
        intrinsic (unlensed) point source amplitudes
256

257
        :param kwargs_ps: point source keyword argument list
258
        :param kwargs_lens: lens model keyword argument list
259
        :return: list of intrinsic (unlensed) point source amplitudes
260
        """
261
        amp_list = []
1✔
262
        for i, model in enumerate(self._point_source_list):
1✔
263
            if self._flux_from_point_source_list[i]:
1✔
264
                amp_list.append(model.source_amplitude(kwargs_ps=kwargs_ps[i], kwargs_lens=kwargs_lens))
1✔
265
        return amp_list
1✔
266

267
    def linear_response_set(self, kwargs_ps, kwargs_lens=None, with_amp=False):
1✔
268
        """
269

270
        :param kwargs_ps: point source keyword argument list
271
        :param kwargs_lens: lens model keyword argument list
272
        :param with_amp: bool, if True returns the image amplitude derived from kwargs_ps,
273
         otherwise the magnification of the lens model
274
        :return: ra_pos, dec_pos, amp, n
275
        """
276
        ra_pos = []
1✔
277
        dec_pos = []
1✔
278
        amp = []
1✔
279
        self._set_save_cache(True)
1✔
280
        x_image_list, y_image_list = self.image_position(kwargs_ps, kwargs_lens)
1✔
281
        for i, model in enumerate(self._point_source_list):
1✔
282
            if self._flux_from_point_source_list[i]:
1✔
283
                x_pos = x_image_list[i]
1✔
284
                y_pos = y_image_list[i]
1✔
285
                if self._fixed_magnification_list[i]:
1✔
286
                    ra_pos.append(list(x_pos))
1✔
287
                    dec_pos.append(list(y_pos))
1✔
288
                    if with_amp:
1✔
289
                        mag = self.image_amplitude(kwargs_ps, kwargs_lens, k=i)[0]
1✔
290
                    else:
291
                        mag = self._lensModel.magnification(x_pos, y_pos, kwargs_lens)
1✔
292
                        mag = np.abs(mag)
1✔
293
                    amp.append(list(mag))
1✔
294
                else:
295
                    if with_amp:
1✔
296
                        mag = self.image_amplitude(kwargs_ps, kwargs_lens, k=i)[0]
1✔
297
                    else:
298
                        mag = np.ones_like(x_pos)
1✔
299
                    for j in range(len(x_pos)):
1✔
300
                        ra_pos.append([x_pos[j]])
1✔
301
                        dec_pos.append([y_pos[j]])
1✔
302
                        amp.append([mag[j]])
1✔
303
        n = len(ra_pos)
1✔
304
        if self._save_cache is False:
1✔
305
            self.delete_lens_model_cache()
1✔
306
            self._set_save_cache(self._save_cache)
1✔
307
        return ra_pos, dec_pos, amp, n
1✔
308

309
    def update_linear(self, param, i, kwargs_ps, kwargs_lens):
1✔
310
        """
311

312
        :param param: list of floats corresponding ot the parameters being sampled
313
        :param i: index of the first parameter relevant for this class
314
        :param kwargs_ps: point source keyword argument list
315
        :param kwargs_lens: lens model keyword argument list
316
        :return: kwargs_ps with updated linear parameters, index of the next parameter relevant for another class
317
        """
318
        ra_pos_list, dec_pos_list = self.image_position(kwargs_ps, kwargs_lens)
1✔
319
        for k, model in enumerate(self._point_source_list):
1✔
320
            if self._flux_from_point_source_list[k]:
1✔
321
                kwargs = kwargs_ps[k]
1✔
322
                if self._fixed_magnification_list[k]:
1✔
323
                    kwargs['source_amp'] = param[i]
1✔
324
                    i += 1
1✔
325
                else:
326
                    n_points = len(ra_pos_list[k])
1✔
327
                    kwargs['point_amp'] = np.array(param[i:i + n_points])
1✔
328
                    i += n_points
1✔
329
        return kwargs_ps, i
1✔
330

331
    def linear_param_from_kwargs(self, kwargs_list):
1✔
332
        """
333
        inverse function of update_linear() returning the linear amplitude list for the keyword argument list
334

335
        :param kwargs_list: model parameters including the linear amplitude parameters
336
        :type kwargs_list: list of keyword arguments
337
        :return: list of linear amplitude parameters
338
        :rtype: list
339
        """
340
        param = []
1✔
341
        for k, model in enumerate(self._point_source_list):
1✔
342
            if self._flux_from_point_source_list[k]:
1✔
343
                kwargs = kwargs_list[k]
1✔
344
                if self._fixed_magnification_list[k]:
1✔
345
                    param.append(kwargs['source_amp'])
1✔
346
                else:
347
                    for a in kwargs['point_amp']:
1✔
348
                        param.append(a)
1✔
349
        return param
1✔
350

351
    def check_image_positions(self, kwargs_ps, kwargs_lens, tolerance=0.001):
1✔
352
        """
353
        checks whether the point sources in kwargs_ps satisfy the lens equation with a tolerance
354
        (computed by ray-tracing in the source plane)
355

356
        :param kwargs_ps: point source keyword argument list
357
        :param kwargs_lens: lens model keyword argument list
358
        :param tolerance: Eucledian distance between the source positions ray-traced backwards to be tolerated
359
        :return: bool: True, if requirement on tolerance is fulfilled, False if not.
360
        """
361
        x_image_list, y_image_list = self.image_position(kwargs_ps, kwargs_lens)
1✔
362
        for i, model in enumerate(self.point_source_type_list):
1✔
363
            if model in ['LENSED_POSITION', 'SOURCE_POSITION']:
1✔
364
                x_pos = x_image_list[i]
1✔
365
                y_pos = y_image_list[i]
1✔
366
                x_source, y_source = self._lensModel.ray_shooting(x_pos, y_pos, kwargs_lens)
1✔
367
                dist = np.sqrt((x_source - x_source[0]) ** 2 + (y_source - y_source[0]) ** 2)
1✔
368
                if np.max(dist) > tolerance:
1✔
369
                    return False
1✔
370
        return True
1✔
371

372
    def set_amplitudes(self, amp_list, kwargs_ps):
1✔
373
        """
374
        translates the amplitude parameters into the convention of the keyword argument list
375
        currently only used in SimAPI to transform magnitudes to amplitudes in the lenstronomy conventions
376

377
        :param amp_list: list of model amplitudes for each point source model
378
        :param kwargs_ps: list of point source keywords
379
        :return: overwrites kwargs_ps with new amplitudes
380
        """
381
        kwargs_list = copy.deepcopy(kwargs_ps)
1✔
382
        for i, model in enumerate(self.point_source_type_list):
1✔
383
            if self._flux_from_point_source_list[i]:
1✔
384
                amp = amp_list[i]
1✔
385
                if model == 'UNLENSED':
1✔
386
                    kwargs_list[i]['point_amp'] = amp
1✔
387
                elif model in ['LENSED_POSITION', 'SOURCE_POSITION']:
1✔
388
                    if self._fixed_magnification_list[i] is True:
1✔
389
                        kwargs_list[i]['source_amp'] = amp
1✔
390
                    else:
391
                        kwargs_list[i]['point_amp'] = amp
1✔
392
        return kwargs_list
1✔
393

394
    @classmethod
1✔
395
    def check_positive_flux(cls, kwargs_ps):
1✔
396
        """
397
        check whether inferred linear parameters are positive
398

399
        :param kwargs_ps: point source keyword argument list
400
        :return: bool, True, if all 'point_amp' parameters are positive semi-definite
401
        """
402
        pos_bool = True
1✔
403
        for kwargs in kwargs_ps:
1✔
404
            if 'point_amp' in kwargs:
1✔
405
                point_amp = kwargs['point_amp']
1✔
406
                if not np.all(point_amp >= 0):
1✔
407
                    pos_bool = False
1✔
408
                    break
1✔
409
            if 'source_amp' in kwargs:
1✔
410
                point_amp = kwargs['source_amp']
1✔
411
                if not np.all(point_amp >= 0):
1✔
412
                    pos_bool = False
1✔
413
                    break
1✔
414
        return pos_bool
1✔
415

416

417
def _sort_position_by_original(x_o, y_o, x_solved, y_solved):
1✔
418
    """
419
    sorting new image positions such that the old order is best preserved
420

421
    :param x_o: numpy array; original image positions
422
    :param y_o: numpy array; original image positions
423
    :param x_solved: numpy array; solved image positions with potentially more or fewer images
424
    :param y_solved: numpy array; solved image positions with potentially more or fewer images
425
    :return: sorted new image positions with the order best matching the original positions first,
426
     and then all other images in the same order as solved for
427
    """
428
    if len(x_o) > len(x_solved):
1✔
429
        # if new images are less , then return the original images (no sorting required)
430
        x_solved_new, y_solved_new = x_o, y_o
1✔
431
    else:
432
        x_solved_new, y_solved_new = [], []
1✔
433
        for i in range(len(x_o)):
1✔
434
            x, y = x_o[i], y_o[i]
1✔
435
            r2_i = (x - x_solved) ** 2 + (y - y_solved) ** 2
1✔
436
            # index of minimum radios
437
            index = np.argmin(r2_i)
1✔
438
            x_solved_new.append(x_solved[index])
1✔
439
            y_solved_new.append(y_solved[index])
1✔
440
            # delete this index
441
            x_solved = np.delete(x_solved, index)
1✔
442
            y_solved = np.delete(y_solved, index)
1✔
443
        # now we append the remaining additional images in the same order behind the original ones
444
        x_solved_new = np.append(np.array(x_solved_new), x_solved)
1✔
445
        y_solved_new = np.append(np.array(y_solved_new), y_solved)
1✔
446
    return x_solved_new, y_solved_new
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc