• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

icecube / flarestack / 8250807569

12 Mar 2024 02:42PM UTC coverage: 90.112% (+15.4%) from 74.702%
8250807569

Pull #355

github

web-flow
Merge ee343c671 into 0d8e01697
Pull Request #355: Transition to Coveralls github-action

4830 of 5360 relevant lines covered (90.11%)

2.69 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.67
/flarestack/core/angular_error_modifier.py
1
import logging
3✔
2
import numpy as np
3✔
3
import os
3✔
4
from flarestack.core.energy_pdf import EnergyPDF
3✔
5
from flarestack.shared import (
3✔
6
    min_angular_err,
7
    base_floor_quantile,
8
    floor_pickle,
9
    pull_pickle,
10
    weighted_quantile,
11
)
12
from flarestack.utils.dynamic_pull_correction import (
3✔
13
    create_quantile_floor_0d,
14
    create_quantile_floor_0d_e,
15
    create_quantile_floor_1d,
16
    create_quantile_floor_1d_e,
17
    create_pull_0d_e,
18
    create_pull_1d,
19
    create_pull_1d_e,
20
    create_pull_2d,
21
    create_pull_2d_e,
22
)
23
import pickle as Pickle
3✔
24
from scipy.interpolate import interp1d, RectBivariateSpline
3✔
25
from flarestack.utils.make_SoB_splines import (
3✔
26
    get_gamma_support_points,
27
    get_gamma_precision,
28
    _around,
29
)
30
import numexpr
3✔
31
import inspect
3✔
32

33
logger = logging.getLogger(__name__)
3✔
34

35

36
class BaseFloorClass(object):
3✔
37
    subclasses: dict[str, object] = {}
3✔
38

39
    def __init__(self, floor_dict):
3✔
40
        self.floor_dict = floor_dict
3✔
41
        self.season = floor_dict["season"]
3✔
42
        self.pickle_name = floor_pickle(floor_dict)
3✔
43

44
    @classmethod
3✔
45
    def register_subclass(cls, floor_name):
3✔
46
        """Adds a new subclass of BaseFloorClass, with class name equal to
47
        "floor_name".
48
        """
49

50
        def decorator(subclass):
3✔
51
            cls.subclasses[floor_name] = subclass
3✔
52
            return subclass
3✔
53

54
        return decorator
3✔
55

56
    @classmethod
3✔
57
    def create(cls, floor_dict):
3✔
58
        floor_name = floor_dict["floor_name"]
3✔
59

60
        if floor_name not in cls.subclasses:
3✔
61
            raise ValueError("Bad floor name {}".format(floor_name))
×
62

63
        return cls.subclasses[floor_name](floor_dict)
3✔
64

65
    def floor(self, data):
3✔
66
        return np.array([0.0 for _ in data])
×
67

68
    def apply_floor(self, data):
3✔
69
        mask = data["raw_sigma"] < self.floor(data)
3✔
70
        new_data = data.copy()
3✔
71
        new_data["sigma"][mask] = np.sqrt(
3✔
72
            self.floor(data[mask].copy()) ** 2.0 + data["raw_sigma"][mask].copy() ** 2.0
3✔
73
        )
74
        return new_data
3✔
75

76
    def apply_dynamic(self, data):
3✔
77
        return data
×
78

79
    def apply_static(self, data):
3✔
80
        return data
×
81

82

83
@BaseFloorClass.register_subclass("no_floor")
3✔
84
class NoFloor(BaseFloorClass):
3✔
85
    pass
3✔
86

87

88
class BaseStaticFloor(BaseFloorClass):
3✔
89
    """Class that enables the application of a static floor. Rewrites the
90
    apply_static method to update the "sigma" field AND update the "raw
91
    sigma" field. This means that the floor is applied once, and then is
92
    permanently in effect. It will run faster because it will not be
93
    reapplied in each iteration.
94
    """
95

96
    def apply_static(self, data):
3✔
97
        data = self.apply_floor(data)
3✔
98
        data["raw_sigma"] = data["sigma"]
3✔
99
        return data
3✔
100

101

102
class BaseDynamicFloorClass(BaseFloorClass):
3✔
103
    """Class that enables the application of a dynamic floor. Rewrites the
104
    apply_dynamic method to update only the "sigma" field". This means that
105
    the floor is applied for each iteration. It will run slower because it will
106
    be reapplied in each iteration.
107
    """
108

109
    def apply_dynamic(self, data):
3✔
110
        return self.floor(data)
×
111

112

113
@BaseFloorClass.register_subclass("static_floor")
3✔
114
class StaticFloor(BaseStaticFloor):
3✔
115
    def __init__(self, floor_dict):
3✔
116
        BaseFloorClass.__init__(self, floor_dict)
3✔
117

118
        try:
3✔
119
            self.min_error = np.deg2rad(floor_dict["min_error_deg"])
3✔
120
        except KeyError:
3✔
121
            self.min_error = min_angular_err
3✔
122

123
        logger.debug(
3✔
124
            "Applying an angular error floor of {0} degrees".format(
3✔
125
                np.degrees(self.min_error)
3✔
126
            )
127
        )
128

129
    def floor(self, data):
3✔
130
        return np.array([self.min_error for _ in data])
3✔
131

132

133
class BaseQuantileFloor(BaseFloorClass):
3✔
134
    def __init__(self, floor_dict):
3✔
135
        try:
×
136
            self.floor_quantile = floor_dict["floor_quantile"]
×
137
        except KeyError:
×
138
            self.floor_quantile = base_floor_quantile
×
139
            floor_dict["floor_quantile"] = self.floor_quantile
×
140

141
        BaseFloorClass.__init__(self, floor_dict)
×
142

143
        logger.debug(
144
            "Applying an angular error floor using quantile {0}".format(
145
                self.floor_quantile
146
            )
147
        )
148

149
        if not os.path.isfile(self.pickle_name):
×
150
            self.create_pickle()
×
151
        else:
×
152
            logger.debug(f"Loading from {self.pickle_name}")
×
153

154
        with open(self.pickle_name, "r") as f:
×
155
            pickled_data = Pickle.load(f)
×
156

157
        self.f = self.create_function(pickled_data)
×
158

159
    def create_pickle(self):
3✔
160
        pass
×
161

162
    def create_function(self, pickled_array):
3✔
163
        pass
×
164

165

166
@BaseFloorClass.register_subclass("quantile_floor_0d")
3✔
167
class QuantileFloor0D(BaseQuantileFloor, BaseStaticFloor):
3✔
168
    def create_pickle(self):
3✔
169
        create_quantile_floor_0d(self.floor_dict)
×
170

171
    def create_function(self, pickled_array):
3✔
172
        return lambda data, params: np.array([pickled_array for _ in data])
×
173

174

175
@BaseFloorClass.register_subclass("quantile_floor_0d_e")
3✔
176
class QuantileFloorEParam0D(BaseQuantileFloor, BaseDynamicFloorClass):
3✔
177
    def create_pickle(self):
3✔
178
        create_quantile_floor_0d_e(self.floor_dict)
×
179

180
    def create_function(self, pickled_array):
3✔
181
        func = interp1d(pickled_array[0], pickled_array[1])
×
182
        return lambda data, params: np.array([func(params) for _ in data])
×
183

184

185
"""
186
# This class has been commented out since its name is redefined below.
187
# However, it seems QuantileFloor1D is never explicitly called in the code and is hence likely untested.
188
@BaseFloorClass.register_subclass("quantile_floor_1d")
189
class QuantileFloor1D(BaseQuantileFloor, BaseStaticFloor):
190
    def create_pickle(self):
191
        create_quantile_floor_1d(self.floor_dict)
192

193
    def create_function(self, pickled_array):
194
        func = interp1d(pickled_array[0], pickled_array[1])
195
        return lambda data, params: func(data["logE"])
196
"""
197

198

199
@BaseFloorClass.register_subclass("quantile_floor_1d_e")
3✔
200
class QuantileFloor1D(BaseQuantileFloor, BaseDynamicFloorClass):
3✔
201
    def create_pickle(self):
3✔
202
        create_quantile_floor_1d_e(self.floor_dict)
×
203

204
    def create_function(self, pickled_array):
3✔
205
        func = RectBivariateSpline(
206
            pickled_array[0],
207
            pickled_array[1],
208
            np.log(pickled_array[2]),
209
            kx=1,
210
            ky=1,
211
            s=0,
212
        )
213
        return lambda data, params: np.array(
214
            [np.exp(func(x["logE"], params[0])[0]) for x in data]
215
        ).T
216

217

218
class BaseAngularErrorModifier(object):
3✔
219
    subclasses: dict[str, object] = {}
3✔
220

221
    def __init__(self, pull_dict):
3✔
222
        self.season = pull_dict["season"]
3✔
223
        self.floor = BaseFloorClass.create(pull_dict)
3✔
224
        self.pull_dict = pull_dict
3✔
225
        self.pull_name = pull_pickle(pull_dict)
3✔
226

227
        # precision in gamma
228
        self.precision = pull_dict.get("gamma_precision", "flarestack")
3✔
229

230
    @classmethod
3✔
231
    def register_subclass(cls, aem_name):
3✔
232
        """Adds a new subclass of BaseAngularErrorModifier,
233
        with class name equal to aem_name.
234

235
        :param aem_name: AngularErrorModifier name
236
        :return: AngularErrorModifier object
237
        """
238

239
        def decorator(subclass):
3✔
240
            cls.subclasses[aem_name] = subclass
3✔
241
            return subclass
3✔
242

243
        return decorator
3✔
244

245
    @classmethod
3✔
246
    def create(
3✔
247
        cls,
248
        season,
249
        e_pdf_dict,
250
        floor_name="static_floor",
2✔
251
        aem_name="no_modifier",
2✔
252
        **kwargs,
253
    ):
254
        pull_dict = dict()
3✔
255
        pull_dict["season"] = season
3✔
256
        pull_dict["e_pdf_dict"] = e_pdf_dict
3✔
257
        pull_dict["floor_name"] = floor_name
3✔
258
        pull_dict["aem_name"] = aem_name
3✔
259
        pull_dict.update(kwargs)
3✔
260

261
        if aem_name not in cls.subclasses:
3✔
262
            raise ValueError("Bad pull name {}".format(aem_name))
263

264
        return cls.subclasses[aem_name](pull_dict)
3✔
265

266
    def pull_correct(self, data, params):
3✔
267
        return data
268

269
    def pull_correct_static(self, data):
3✔
270
        data = self.floor.apply_static(data)
3✔
271
        return data
3✔
272

273
    def pull_correct_dynamic(self, data, params):
3✔
274
        data = self.floor.apply_dynamic(data, params)
275
        return data
276

277
    def create_spatial_cache(self, cut_data, SoB_pdf):
3✔
278
        if len(inspect.getfullargspec(SoB_pdf)[0]) == 2:
3✔
279
            SoB = dict()
3✔
280
            for gamma in get_gamma_support_points(precision=self.precision):
3✔
281
                SoB[gamma] = np.log(SoB_pdf(cut_data, gamma))
3✔
282
        else:
283
            SoB = SoB_pdf(cut_data)
3✔
284
        return SoB
3✔
285

286
    def estimate_spatial(self, gamma, spatial_cache):
3✔
287
        if isinstance(spatial_cache, dict):
3✔
288
            return self.estimate_spatial_dynamic(gamma, spatial_cache)
3✔
289
        else:
290
            return spatial_cache
3✔
291

292
    def estimate_spatial_dynamic(self, gamma, spatial_cache):
3✔
293
        """Quickly estimates the value of pull for Gamma.
294
        Uses pre-calculated values for first and second derivatives.
295
        Uses a Taylor series to estimate S(gamma), unless pull has already
296
        been calculated for a given gamma.
297

298
        :param gamma: Spectral Index
299
        :param spatial_cache: Median Pull cache
300
        :return: Estimated value for S(gamma)
301
        """
302
        if gamma in list(spatial_cache.keys()):
3✔
303
            val = np.exp(spatial_cache[gamma])
3✔
304
            # val = spatial_cache[gamma]
305
        else:
306
            g1 = _around(gamma, self.precision)
3✔
307
            dg = get_gamma_precision(self.precision)
3✔
308

309
            g0 = _around(g1 - dg, self.precision)
3✔
310
            g2 = _around(g1 + dg, self.precision)
3✔
311

312
            # Uses Numexpr to quickly estimate S(gamma)
313

314
            S0 = spatial_cache[g0]
3✔
315
            S1 = spatial_cache[g1]
3✔
316
            S2 = spatial_cache[g2]
3✔
317

318
            val = numexpr.evaluate(
3✔
319
                "exp((S0 - 2.*S1 + S2) / (2. * dg**2) * (gamma - g1)**2"
3✔
320
                + " + (S2 -S0) / (2. * dg) * (gamma - g1) + S1)"
321
            )
322
            # val = numexpr.evaluate(
323
            #     "((S0 - 2.*S1 + S2) / (2. * dg**2) * (gamma - g1)**2" + \
324
            #     " + (S2 -S0) / (2. * dg) * (gamma - g1) + S1)"
325
            # )
326

327
        return val
3✔
328

329

330
@BaseAngularErrorModifier.register_subclass("no_pull")
3✔
331
class NoPull(BaseAngularErrorModifier):
3✔
332
    pass
3✔
333

334

335
class BaseMedianAngularErrorModifier(BaseAngularErrorModifier):
3✔
336
    def __init__(self, pull_dict):
3✔
337
        BaseAngularErrorModifier.__init__(self, pull_dict)
338

339
        if not os.path.isfile(self.pull_name):
340
            self.create_pickle()
341
        else:
342
            logger.debug(f"Loading from {self.pull_name}")
343

344
        with open(self.pull_name, "r") as f:
345
            self.pickled_data = Pickle.load(f)
346

347
    def pull_correct(self, f, data):
3✔
348
        data["sigma"] = np.exp(f(data)) * data["raw_sigma"]
349
        return data
350

351
    def create_pickle(self):
3✔
352
        pass
353

354
    def create_static(self):
3✔
355
        return lambda data: np.array([1.0 for _ in data])
356

357
    def create_dynamic(self, pickled_array):
3✔
358
        return lambda data: np.array([1.0 for _ in data])
359

360

361
class StaticMedianPullCorrector(BaseMedianAngularErrorModifier):
3✔
362
    def __init__(self, pull_dict):
3✔
363
        BaseMedianAngularErrorModifier.__init__(self, pull_dict)
364

365
        self.static_f = self.create_static()
366

367
    def pull_correct_static(self, data):
3✔
368
        data = self.floor.apply_static(data)
369
        data = self.pull_correct(self.static_f, data)
370

371
        data["raw_sigma"] = data["sigma"]
372

373
        return data
374

375

376
class DynamicMedianPullCorrector(BaseMedianAngularErrorModifier):
3✔
377
    def __init__(self, pull_dict):
3✔
378
        BaseAngularErrorModifier.__init__(self, pull_dict)
379

380
    def estimate_spatial(self, gamma, spatial_cache):
3✔
381
        return self.estimate_spatial_dynamic(gamma, spatial_cache)
382

383
    def pull_correct_dynamic(self, data, param):
3✔
384
        data = self.floor.apply_dynamic(data)
385
        f = self.create_dynamic(self.pickled_data[param])
386
        data["sigma"] = np.exp(f(data)) * data["raw_sigma"]
387
        return data
388

389
    def create_spatial_cache(self, cut_data, SoB_pdf):
3✔
390
        """Evaluates the median pull values for all coincidentdata. For each
391
        value of gamma in self.gamma_support_points, calculates
392
        the Log(Signal/Background) values for the coincident data. Then saves
393
        each weight array to a dictionary.
394

395
        :param cut_data: Subset of the data containing only coincident events
396
        :return: Dictionary containing SoB values for each event for each
397
        gamma value.
398
        """
399

400
        spatial_cache = dict()
401

402
        for key in sorted(self.pickled_data.keys()):
403
            cut_data = self.pull_correct_dynamic(cut_data, key)
404

405
            # If gamma is needed to evaluate spatial PDF (say because you
406
            # have overlapping PDFs and you need to know the weights,
407
            # then you pass the key. Otherwise just evaluate as normal.
408

409
            if len(inspect.getargspec(SoB_pdf)[0]) == 2:
410
                SoB = SoB_pdf(cut_data, key)
411
            else:
412
                SoB = SoB_pdf(cut_data)
413

414
            spatial_cache[key] = np.log(SoB)
415

416
        return spatial_cache
417

418

419
@BaseAngularErrorModifier.register_subclass("median_0d_e")
3✔
420
class MedianPullEParam0D(DynamicMedianPullCorrector):
3✔
421
    def create_pickle(self):
3✔
422
        create_pull_0d_e(self.pull_dict)
423

424
    def create_dynamic(self, pickled_array):
3✔
425
        return lambda data: np.array([pickled_array for _ in data])
426

427

428
@BaseAngularErrorModifier.register_subclass("median_1d")
3✔
429
class MedianPull1D(StaticMedianPullCorrector):
3✔
430
    def create_pickle(self):
3✔
431
        create_pull_1d(self.pull_dict)
432

433
    def create_static(self):
3✔
434
        func = interp1d(self.pickled_data[0], self.pickled_data[1])
435
        return lambda data: func(data["logE"])
436

437

438
@BaseAngularErrorModifier.register_subclass("median_1d_e")
3✔
439
class MedianPullEParam1D(DynamicMedianPullCorrector):
3✔
440
    def create_pickle(self):
3✔
441
        create_pull_1d_e(self.pull_dict)
442

443
    def create_dynamic(self, pickled_array):
3✔
444
        func = interp1d(pickled_array[0], pickled_array[1])
445
        return lambda data: func(data["logE"])
446

447

448
@BaseAngularErrorModifier.register_subclass("median_2d")
3✔
449
class MedianPull2D(StaticMedianPullCorrector):
3✔
450
    def create_pickle(self):
3✔
451
        create_pull_2d(self.pull_dict)
452

453
    def create_static(self):
3✔
454
        func = RectBivariateSpline(
455
            self.pickled_data[0], self.pickled_data[1], self.pickled_data[2]
456
        )
457

458
        return lambda data: [func(x["logE"], x["sinDec"])[0][0] for x in data]
459
        # return lambda data:
460

461

462
@BaseAngularErrorModifier.register_subclass("median_2d_e")
3✔
463
class MedianPullEParam2D(DynamicMedianPullCorrector):
3✔
464
    def create_pickle(self):
3✔
465
        create_pull_2d_e(self.pull_dict)
466

467
    def create_dynamic(self, pickled_array):
3✔
468
        func = RectBivariateSpline(pickled_array[0], pickled_array[1], pickled_array[2])
469

470
        return lambda data: func(data["logE"], data["sinDec"])
471

472

473
"""
474
# NOTE: `mypy` allowed to find some deprecations in the following __main__ block that were never noticed before, because the block is not part of any testing routine. It should be checked whether equivalent code is covered by an unit test, before removing the (now commented) block altogether.
475

476
if __name__ == "__main__":
477
    from flarestack.data.icecube.ps_tracks.ps_v002_p01 import IC86_1_dict
478
    from flarestack.analyses.angular_error_floor.plot_bias import (
479
        get_data,
480
        weighted_quantile,
481
    )
482
    from scipy.stats import norm
483

484
    print(norm.cdf(1.0))
485

486
    def symmetric_gauss(sigma):
487
        return 1 - 2 * norm.sf(sigma)
488

489
    def gauss_2d(sigma):
490
        # return symmetric_gauss(sigma) ** 2
491
        return symmetric_gauss(sigma)
492

493
    print(symmetric_gauss(1.0))
494
    print(gauss_2d(1.0))
495
    print(gauss_2d(1.177))
496

497
    e_pdf_dict = {"Name": "Power Law", "Gamma": 3.0}
498

499
    e_pdf = EnergyPDF.create(e_pdf_dict)
500

501
    pc = BaseAngularErrorModifier.create(
502
        IC86_1_dict, e_pdf_dict, "no_floor", "median_2d"
503
    )
504
    mc, x, y = get_data(IC86_1_dict)[:10]
505

506
    pulls = x / y
507

508
    weights = e_pdf.weight_mc(mc)
509

510
    median_pull = weighted_quantile(pulls, 0.5, weights)
511

512
    def med_pull(data):
513
        y = np.degrees(data["sigma"])
514
        pulls = x / y
515
        med = weighted_quantile(pulls, 0.5, weights)
516
        return med
517

518
    print(mc["sigma"][:5])
519

520
    mc = pc.pull_correct_static(mc)
521

522
    print(mc["sigma"][:5])
523

524
    print(med_pull(mc))
525

526
    print(median_pull)
527

528
# @BaseAngularErrorModifier.register_subclass('static_pull_corrector')
529
# class Static1DPullCorrector(BaseAngularErrorModifier):
530
#
531
#     def __init__(self, season, e_pdf_dict, **kwargs):
532
#         BaseAngularErrorModifier.__init__(self, season, e_pdf_dict)
533

534

535
# x = BaseFloorClass.create(IC86_1_dict, e_pdf_dict, "quantile_floor_1d_e",
536
#                           floor_quantile=0.1)
537
# for gamma in np.linspace(1.0, 4.0, 4):
538
#     print data_loader(IC86_1_dict["exp_path"])["logE"][:8]
539
#     print np.degrees(x.f(data_loader(IC86_1_dict["exp_path"])[:8], [gamma]))
540
"""
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc