• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

XENONnT / straxen / 6353397986

29 Sep 2023 03:02PM UTC coverage: 93.724% (+0.05%) from 93.673%
6353397986

Pull #1240

github

web-flow
Merge fa9f911eb into 9bf6192f0
Pull Request #1240: Proposal to use pre-commit for continuous integration

2924 of 2924 new or added lines in 106 files covered. (100.0%)

8751 of 9337 relevant lines covered (93.72%)

0.94 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.02
/straxen/plugins/events/event_pattern_fit.py
1
import strax
2✔
2
import straxen
2✔
3
import numpy as np
2✔
4
import numba
2✔
5
from straxen.numbafied_scipy import numba_gammaln, numba_betainc
2✔
6
from scipy.special import loggamma
2✔
7

8
export, __all__ = strax.exporter()
2✔
9

10

11
@export
2✔
12
class EventPatternFit(strax.Plugin):
2✔
13
    """Plugin that provides patter information for events."""
14

15
    depends_on = ("event_area_per_channel", "event_basics", "event_positions")
2✔
16
    provides = "event_pattern_fit"
2✔
17
    __version__ = "0.1.3"
2✔
18

19
    # Getting S1 AFT maps
20
    s1_aft_map = straxen.URLConfig(
2✔
21
        default="itp_map://resource://cmt://"
22
        "s1_aft_xyz_map"
23
        "?version=ONLINE&run_id=plugin.run_id&fmt=json",
24
        cache=True,
25
    )
26

27
    electron_drift_velocity = straxen.URLConfig(
2✔
28
        default="cmt://electron_drift_velocity?version=ONLINE&run_id=plugin.run_id",
29
        cache=True,
30
        help="Vertical electron drift velocity in cm/ns (1e4 m/ms)",
31
    )
32

33
    electron_drift_time_gate = straxen.URLConfig(
2✔
34
        default="cmt://electron_drift_time_gate?version=ONLINE&run_id=plugin.run_id",
35
        help="Electron drift time from the gate in ns",
36
        cache=True,
37
    )
38

39
    s1_optical_map = straxen.URLConfig(
2✔
40
        help="S1 (x, y, z) optical/pattern map.",
41
        infer_type=False,
42
        default="itp_map://"
43
        "resource://"
44
        "XENONnT_s1_xyz_patterns_corrected_qes_MCva43fa9b_wires.pkl"
45
        "?fmt=pkl",
46
    )
47

48
    s2_optical_map = straxen.URLConfig(
2✔
49
        help="S2 (x, y) optical/pattern map.",
50
        infer_type=False,
51
        default="itp_map://"
52
        "resource://"
53
        "XENONnT_s2_xy_patterns_LCE_corrected_qes_MCva43fa9b_wires.pkl"
54
        "?fmt=pkl",
55
    )
56

57
    s2_tf_model = straxen.URLConfig(
2✔
58
        help="S2 (x, y) optical data-driven model",
59
        infer_type=False,
60
        default="tf://"
61
        "resource://"
62
        "XENONnT_s2_optical_map_data_driven_ML_v0_2021_11_25.tar.gz"
63
        "?custom_objects=plugin.s2_map_custom_objects"
64
        "&fmt=abs_path",
65
    )
66

67
    mean_pe_per_photon = straxen.URLConfig(
2✔
68
        help="Mean of full VUV single photon response",
69
        default=1.2,
70
        infer_type=False,
71
    )
72

73
    gain_model = straxen.URLConfig(
2✔
74
        infer_type=False, help="PMT gain model. Specify as (model_type, model_config)"
75
    )
76

77
    n_tpc_pmts = straxen.URLConfig(type=int, help="Number of TPC PMTs")
2✔
78

79
    n_top_pmts = straxen.URLConfig(type=int, help="Number of top TPC PMTs")
2✔
80

81
    s1_min_area_pattern_fit = straxen.URLConfig(
2✔
82
        infer_type=False,
83
        help="Skip EventPatternFit reconstruction if S1 area (PE) is less than this",
84
        default=2,
85
    )
86

87
    s2_min_area_pattern_fit = straxen.URLConfig(
2✔
88
        infer_type=False,
89
        help="Skip EventPatternFit reconstruction if S2 area (PE) is less than this",
90
        default=10,
91
    )
92

93
    store_per_channel = straxen.URLConfig(
2✔
94
        default=False, type=bool, help="Store normalized LLH per channel for each peak"
95
    )
96

97
    max_r_pattern_fit = straxen.URLConfig(
2✔
98
        default=straxen.tpc_r,
99
        type=float,
100
        help="Maximal radius of the peaks where llh calculation will be performed",
101
    )
102

103
    def infer_dtype(self):
2✔
104
        dtype = [
2✔
105
            ("s2_2llh", np.float32, "Modified Poisson likelihood value for main S2 in the event"),
106
            (
107
                "s2_neural_2llh",
108
                np.float32,
109
                "Data-driven based likelihood value for main S2 in the event",
110
            ),
111
            ("alt_s2_2llh", np.float32, "Modified Poisson likelihood value for alternative S2"),
112
            (
113
                "alt_s2_neural_2llh",
114
                np.float32,
115
                "Data-driven based likelihood value for alternative S2 in the event",
116
            ),
117
            ("s1_2llh", np.float32, "Modified Poisson likelihood value for main S1"),
118
            (
119
                "s1_top_2llh",
120
                np.float32,
121
                "Modified Poisson likelihood value for main S1, calculated from top array",
122
            ),
123
            (
124
                "s1_bottom_2llh",
125
                np.float32,
126
                "Modified Poisson likelihood value for main S1, calculated from bottom array",
127
            ),
128
            (
129
                "s1_area_fraction_top_continuous_probability",
130
                np.float32,
131
                "Continuous binomial test for S1 area fraction top",
132
            ),
133
            (
134
                "s1_area_fraction_top_discrete_probability",
135
                np.float32,
136
                "Discrete binomial test for S1 area fraction top",
137
            ),
138
            (
139
                "s1_photon_fraction_top_continuous_probability",
140
                np.float32,
141
                "Continuous binomial test for S1 photon fraction top",
142
            ),
143
            (
144
                "s1_photon_fraction_top_discrete_probability",
145
                np.float32,
146
                "Discrete binomial test for S1 photon fraction top",
147
            ),
148
            (
149
                "alt_s1_area_fraction_top_continuous_probability",
150
                np.float32,
151
                "Continuous binomial test for alternative S1 area fraction top",
152
            ),
153
            (
154
                "alt_s1_area_fraction_top_discrete_probability",
155
                np.float32,
156
                "Discrete binomial test for alternative S1 area fraction top",
157
            ),
158
            (
159
                "alt_s1_photon_fraction_top_continuous_probability",
160
                np.float32,
161
                "Continuous binomial test for alternative S1 photon fraction top",
162
            ),
163
            (
164
                "alt_s1_photon_fraction_top_discrete_probability",
165
                np.float32,
166
                "Discrete binomial test for alternative S1 photon fraction top",
167
            ),
168
        ]
169

170
        if self.store_per_channel:
2✔
171
            dtype += [
2✔
172
                (
173
                    ("2LLH per channel for main S2", "s2_2llh_per_channel"),
174
                    np.float32,
175
                    (self.n_top_pmts,),
176
                ),
177
                (
178
                    ("2LLH per channel for alternative S2", "alt_s2_2llh_per_channel"),
179
                    np.float32,
180
                    (self.n_top_pmts,),
181
                ),
182
                (("Pattern main S2", "s2_pattern"), np.float32, (self.n_top_pmts,)),
183
                (("Pattern alt S2", "alt_s2_pattern"), np.float32, (self.n_top_pmts,)),
184
                (("Pattern for main S1", "s1_pattern"), np.float32, (self.n_tpc_pmts,)),
185
                (
186
                    ("2LLH per channel for main S1", "s1_2llh_per_channel"),
187
                    np.float32,
188
                    (self.n_tpc_pmts,),
189
                ),
190
            ]
191
        dtype += strax.time_fields
2✔
192
        return dtype
2✔
193

194
    @property
2✔
195
    def s2_map_custom_objects(self):
2✔
196
        def _logl_loss(patterns_true, likelihood):
2✔
197
            return likelihood / 10.0
×
198

199
        return {"_logl_loss": _logl_loss}
2✔
200

201
    def setup(self):
2✔
202
        # FIXME: Consider renaming the configs to match usage
203

204
        self.to_pe = self.gain_model
2✔
205

206
        self.mean_pe_photon = self.mean_pe_per_photon
2✔
207

208
        # Getting optical maps
209
        self.s1_pattern_map = self.s1_optical_map
2✔
210
        self.s2_pattern_map = self.s2_optical_map
2✔
211

212
        # Getting S2 data-driven tensorflow models
213
        self.model = self.s2_tf_model
2✔
214

215
        import tensorflow as tf
2✔
216

217
        self.model_chi2 = tf.keras.Model(
2✔
218
            self.model.inputs, self.model.get_layer("Likelihood").output
219
        )
220

221
        # Getting gain model to get dead PMTs
222
        self.dead_PMTs = np.where(self.to_pe == 0)[0]
2✔
223
        self.pmtbool = ~np.in1d(np.arange(0, self.n_tpc_pmts), self.dead_PMTs)
2✔
224
        self.pmtbool_top = self.pmtbool[: self.n_top_pmts]
2✔
225
        self.pmtbool_bottom = self.pmtbool[self.n_top_pmts : self.n_tpc_pmts]
2✔
226

227
    def compute(self, events):
2✔
228
        result = np.zeros(len(events), dtype=self.dtype)
2✔
229
        result["time"] = events["time"]
2✔
230
        result["endtime"] = strax.endtime(events)
2✔
231

232
        # Computing LLH values for S1s
233
        self.compute_s1_llhvalue(events, result)
2✔
234

235
        # Computing LLH values for S2s
236
        self.compute_s2_llhvalue(events, result)
2✔
237

238
        # Computing chi2 values for S2s
239
        self.compute_s2_neural_llhvalue(events, result)
2✔
240

241
        # Computing binomial test for s1 area fraction top
242
        positions = np.vstack([events["x"], events["y"], events["z"]]).T
2✔
243
        aft_prob = self.s1_aft_map(positions)
2✔
244

245
        alt_s1_interaction_drift_time = events["s2_center_time"] - events["alt_s1_center_time"]
2✔
246
        alt_s1_interaction_z = -self.electron_drift_velocity * (
2✔
247
            alt_s1_interaction_drift_time - self.electron_drift_time_gate
248
        )
249
        alt_positions = np.vstack([events["x"], events["y"], alt_s1_interaction_z]).T
2✔
250
        alt_aft_prob = self.s1_aft_map(alt_positions)
2✔
251

252
        # main s1 events
253
        mask_s1 = ~np.isnan(aft_prob)
2✔
254
        mask_s1 &= ~np.isnan(events["s1_area"])
2✔
255
        mask_s1 &= ~np.isnan(events["s1_area_fraction_top"])
2✔
256

257
        # default value is nan, it will be overwrite if the event satisfy the requirements
258
        result["s1_area_fraction_top_continuous_probability"][:] = np.nan
2✔
259
        result["s1_area_fraction_top_discrete_probability"][:] = np.nan
2✔
260
        result["s1_photon_fraction_top_continuous_probability"][:] = np.nan
2✔
261
        result["s1_photon_fraction_top_discrete_probability"][:] = np.nan
2✔
262

263
        # compute binomial test only if we have events that have valid aft prob, s1 area and s1 aft
264
        if np.sum(mask_s1):
2✔
265
            arg = (
2✔
266
                aft_prob[mask_s1],
267
                events["s1_area"][mask_s1],
268
                events["s1_area_fraction_top"][mask_s1],
269
            )
270
            result["s1_area_fraction_top_continuous_probability"][
2✔
271
                mask_s1
272
            ] = s1_area_fraction_top_probability(*arg)
273
            result["s1_area_fraction_top_discrete_probability"][
2✔
274
                mask_s1
275
            ] = s1_area_fraction_top_probability(*arg, "discrete")
276
            arg = (
2✔
277
                aft_prob[mask_s1],
278
                events["s1_area"][mask_s1] / self.mean_pe_per_photon,
279
                events["s1_area_fraction_top"][mask_s1],
280
            )
281
            result["s1_photon_fraction_top_continuous_probability"][
2✔
282
                mask_s1
283
            ] = s1_area_fraction_top_probability(*arg)
284
            result["s1_photon_fraction_top_discrete_probability"][
2✔
285
                mask_s1
286
            ] = s1_area_fraction_top_probability(*arg, "discrete")
287

288
        # alternative s1 events
289
        mask_alt_s1 = ~np.isnan(alt_aft_prob)
2✔
290
        mask_alt_s1 &= ~np.isnan(events["alt_s1_area"])
2✔
291
        mask_alt_s1 &= ~np.isnan(events["alt_s1_area_fraction_top"])
2✔
292

293
        # default value is nan, it will be ovewrite if the event satisfy the requirments
294
        result["alt_s1_area_fraction_top_continuous_probability"][:] = np.nan
2✔
295
        result["alt_s1_area_fraction_top_discrete_probability"][:] = np.nan
2✔
296
        result["alt_s1_photon_fraction_top_continuous_probability"][:] = np.nan
2✔
297
        result["alt_s1_photon_fraction_top_discrete_probability"][:] = np.nan
2✔
298

299
        # compute binomial test only if we have events that have valid aft prob, alt s1 area and alt s1 aft
300
        if np.sum(mask_alt_s1):
2✔
301
            arg = (
×
302
                alt_aft_prob[mask_alt_s1],
303
                events["alt_s1_area"][mask_alt_s1],
304
                events["alt_s1_area_fraction_top"][mask_alt_s1],
305
            )
306
            result["alt_s1_area_fraction_top_continuous_probability"][
×
307
                mask_alt_s1
308
            ] = s1_area_fraction_top_probability(*arg)
309
            result["alt_s1_area_fraction_top_discrete_probability"][
×
310
                mask_alt_s1
311
            ] = s1_area_fraction_top_probability(*arg, "discrete")
312
            arg = (
×
313
                alt_aft_prob[mask_alt_s1],
314
                events["alt_s1_area"][mask_alt_s1] / self.mean_pe_per_photon,
315
                events["alt_s1_area_fraction_top"][mask_alt_s1],
316
            )
317
            result["alt_s1_photon_fraction_top_continuous_probability"][
×
318
                mask_alt_s1
319
            ] = s1_area_fraction_top_probability(*arg)
320
            result["alt_s1_photon_fraction_top_discrete_probability"][
×
321
                mask_alt_s1
322
            ] = s1_area_fraction_top_probability(*arg, "discrete")
323

324
        return result
2✔
325

326
    def compute_s1_llhvalue(self, events, result):
2✔
327
        # Selecting S1s for pattern fit calculation
328
        # - must exist (index != -1)
329
        # - must have total area larger minimal one
330
        # - must have positive AFT
331
        x, y, z = events["x"], events["y"], events["z"]
2✔
332
        cur_s1_bool = events["s1_area"] > self.s1_min_area_pattern_fit
2✔
333
        cur_s1_bool &= events["s1_index"] != -1
2✔
334
        cur_s1_bool &= events["s1_area_fraction_top"] >= 0
2✔
335
        cur_s1_bool &= np.isfinite(x)
2✔
336
        cur_s1_bool &= np.isfinite(y)
2✔
337
        cur_s1_bool &= np.isfinite(z)
2✔
338
        cur_s1_bool &= (x**2 + y**2) < self.max_r_pattern_fit**2
2✔
339

340
        # default value is nan, it will be ovewrite if the event satisfy the requirments
341
        result["s1_2llh"][:] = np.nan
2✔
342
        result["s1_top_2llh"][:] = np.nan
2✔
343
        result["s1_bottom_2llh"][:] = np.nan
2✔
344

345
        # Making expectation patterns [ in PE ]
346
        if np.sum(cur_s1_bool):
2✔
347
            s1_map_effs = self.s1_pattern_map(np.array([x, y, z]).T)[cur_s1_bool, :]
2✔
348
            s1_area = events["s1_area"][cur_s1_bool]
2✔
349
            s1_pattern = (
2✔
350
                s1_area[:, None]
351
                * (s1_map_effs[:, self.pmtbool])
352
                / np.sum(s1_map_effs[:, self.pmtbool], axis=1)[:, None]
353
            )
354

355
            s1_pattern_top = events["s1_area_fraction_top"][cur_s1_bool] * s1_area
2✔
356
            s1_pattern_top = s1_pattern_top[:, None] * (
2✔
357
                (s1_map_effs[:, : self.n_top_pmts])[:, self.pmtbool_top]
358
            )
359
            s1_pattern_top /= np.sum(
2✔
360
                (s1_map_effs[:, : self.n_top_pmts])[:, self.pmtbool_top], axis=1
361
            )[:, None]
362
            s1_pattern_bottom = (1 - events["s1_area_fraction_top"][cur_s1_bool]) * s1_area
2✔
363
            s1_pattern_bottom = s1_pattern_bottom[:, None] * (
2✔
364
                (s1_map_effs[:, self.n_top_pmts :])[:, self.pmtbool_bottom]
365
            )
366
            s1_pattern_bottom /= np.sum(
2✔
367
                (s1_map_effs[:, self.n_top_pmts :])[:, self.pmtbool_bottom], axis=1
368
            )[:, None]
369

370
            # Getting pattern from data
371
            s1_area_per_channel_ = events["s1_area_per_channel"][cur_s1_bool, :]
2✔
372
            s1_area_per_channel = s1_area_per_channel_[:, self.pmtbool]
2✔
373
            s1_area_per_channel_top = (s1_area_per_channel_[:, : self.n_top_pmts])[
2✔
374
                :, self.pmtbool_top
375
            ]
376
            s1_area_per_channel_bottom = (s1_area_per_channel_[:, self.n_top_pmts :])[
2✔
377
                :, self.pmtbool_bottom
378
            ]
379

380
            # Top and bottom
381
            arg1 = s1_pattern / self.mean_pe_photon, s1_area_per_channel, self.mean_pe_photon
2✔
382
            arg2 = (
2✔
383
                s1_area_per_channel / self.mean_pe_photon,
384
                s1_area_per_channel,
385
                self.mean_pe_photon,
386
            )
387
            norm_llh_val = neg2llh_modpoisson(*arg1) - neg2llh_modpoisson(*arg2)
2✔
388
            result["s1_2llh"][cur_s1_bool] = np.sum(norm_llh_val, axis=1)
2✔
389

390
            # If needed to stire - store only top and bottom array, but not together
391
            if self.store_per_channel:
2✔
392
                # Storring pattern information
393
                store_patterns = np.zeros((s1_pattern.shape[0], self.n_tpc_pmts))
2✔
394
                store_patterns[:, self.pmtbool] = s1_pattern
2✔
395
                result["s1_pattern"][cur_s1_bool] = store_patterns
2✔
396
                # Storing actual LLH values
397
                store_2LLH_ch = np.zeros((norm_llh_val.shape[0], self.n_tpc_pmts))
2✔
398
                store_2LLH_ch[:, self.pmtbool] = norm_llh_val
2✔
399
                result["s1_2llh_per_channel"][cur_s1_bool] = store_2LLH_ch
2✔
400

401
            # Top
402
            arg1 = (
2✔
403
                s1_pattern_top / self.mean_pe_photon,
404
                s1_area_per_channel_top,
405
                self.mean_pe_photon,
406
            )
407
            arg2 = (
2✔
408
                s1_area_per_channel_top / self.mean_pe_photon,
409
                s1_area_per_channel_top,
410
                self.mean_pe_photon,
411
            )
412
            norm_llh_val = neg2llh_modpoisson(*arg1) - neg2llh_modpoisson(*arg2)
2✔
413
            result["s1_top_2llh"][cur_s1_bool] = np.sum(norm_llh_val, axis=1)
2✔
414

415
            # Bottom
416
            arg1 = (
2✔
417
                s1_pattern_bottom / self.mean_pe_photon,
418
                s1_area_per_channel_bottom,
419
                self.mean_pe_photon,
420
            )
421
            arg2 = (
2✔
422
                s1_area_per_channel_bottom / self.mean_pe_photon,
423
                s1_area_per_channel_bottom,
424
                self.mean_pe_photon,
425
            )
426
            norm_llh_val = neg2llh_modpoisson(*arg1) - neg2llh_modpoisson(*arg2)
2✔
427
            result["s1_bottom_2llh"][cur_s1_bool] = np.sum(norm_llh_val, axis=1)
2✔
428

429
    def compute_s2_llhvalue(self, events, result):
2✔
430
        for t_ in ["s2", "alt_s2"]:
2✔
431
            # Selecting S2s for pattern fit calculation
432
            # - must exist (index != -1)
433
            # - must have total area larger minimal one
434
            # - must have positive AFT
435
            x, y = events[t_ + "_x"], events[t_ + "_y"]
2✔
436
            s2_mask = events[t_ + "_area"] > self.s2_min_area_pattern_fit
2✔
437
            s2_mask &= events[t_ + "_area_fraction_top"] > 0
2✔
438
            s2_mask &= (x**2 + y**2) < self.max_r_pattern_fit**2
2✔
439

440
            # default value is nan, it will be ovewrite if the event satisfy the requirments
441
            result[t_ + "_2llh"][:] = np.nan
2✔
442

443
            # Making expectation patterns [ in PE ]
444
            if np.sum(s2_mask):
2✔
445
                s2_map_effs = self.s2_pattern_map(np.array([x, y]).T)[s2_mask, 0 : self.n_top_pmts]
2✔
446
                s2_map_effs = s2_map_effs[:, self.pmtbool_top]
2✔
447
                s2_top_area = (events[t_ + "_area_fraction_top"] * events[t_ + "_area"])[s2_mask]
2✔
448
                s2_pattern = (
2✔
449
                    s2_top_area[:, None] * s2_map_effs / np.sum(s2_map_effs, axis=1)[:, None]
450
                )
451

452
                # Getting pattern from data
453
                s2_top_area_per_channel = events[t_ + "_area_per_channel"][
2✔
454
                    s2_mask, 0 : self.n_top_pmts
455
                ]
456
                s2_top_area_per_channel = s2_top_area_per_channel[:, self.pmtbool_top]
2✔
457

458
                # Calculating LLH, this is shifted Poisson
459
                # we get area expectation and we need to scale them to get
460
                # photon expectation
461
                norm_llh_val = neg2llh_modpoisson(
2✔
462
                    mu=s2_pattern / self.mean_pe_photon,
463
                    areas=s2_top_area_per_channel,
464
                    mean_pe_photon=self.mean_pe_photon,
465
                ) - neg2llh_modpoisson(
466
                    mu=s2_top_area_per_channel / self.mean_pe_photon,
467
                    areas=s2_top_area_per_channel,
468
                    mean_pe_photon=self.mean_pe_photon,
469
                )
470
                result[t_ + "_2llh"][s2_mask] = np.sum(norm_llh_val, axis=1)
2✔
471

472
                if self.store_per_channel:
2✔
473
                    store_patterns = np.zeros((s2_pattern.shape[0], self.n_top_pmts))
2✔
474
                    store_patterns[:, self.pmtbool_top] = s2_pattern
2✔
475
                    result[t_ + "_pattern"][s2_mask] = store_patterns  #:s2_pattern[s2_mask]
2✔
476

477
                    store_2LLH_ch = np.zeros((norm_llh_val.shape[0], self.n_top_pmts))
2✔
478
                    store_2LLH_ch[:, self.pmtbool_top] = norm_llh_val
2✔
479
                    result[t_ + "_2llh_per_channel"][s2_mask] = store_2LLH_ch
2✔
480

481
    def compute_s2_neural_llhvalue(self, events, result):
2✔
482
        for t_ in ["s2", "alt_s2"]:
2✔
483
            x, y = events[t_ + "_x"], events[t_ + "_y"]
2✔
484
            s2_mask = events[t_ + "_area"] > self.s2_min_area_pattern_fit
2✔
485
            s2_mask &= events[t_ + "_area_fraction_top"] > 0
2✔
486

487
            # default value is nan, it will be ovewrite if the event satisfy the requirements
488
            result[t_ + "_neural_2llh"][:] = np.nan
2✔
489

490
            # Produce position and top pattern to feed tensorflow model, return chi2/N
491
            if np.sum(s2_mask):
2✔
492
                s2_pos = np.stack((x, y)).T[s2_mask]
2✔
493
                s2_pat = events[t_ + "_area_per_channel"][s2_mask, 0 : self.n_top_pmts]
2✔
494
                # Output[0]: loss function, -2*log-likelihood, Output[1]: chi2
495
                result[t_ + "_neural_2llh"][s2_mask] = self.model_chi2.predict(
2✔
496
                    {"xx": s2_pos, "yy": s2_pat}, verbose=0
497
                )[1]
498

499

500
def neg2llh_modpoisson(mu=None, areas=None, mean_pe_photon=1.0):
2✔
501
    """Modified poisson distribution with proper normalization for shifted poisson.
502

503
    mu - expected number of photons per channel
504
    areas  - observed areas per channel
505
    mean_pe_photon - mean of area responce for one photon
506

507
    """
508
    with np.errstate(divide="ignore", invalid="ignore"):
2✔
509
        fraction = areas / mean_pe_photon
2✔
510
        res = 2.0 * (
2✔
511
            mu - (fraction) * np.log(mu) + loggamma((fraction) + 1) + np.log(mean_pe_photon)
512
        )
513
    is_zero = areas <= 0  # If area equals or smaller than 0 - assume 0
2✔
514
    res[is_zero] = 2.0 * mu[is_zero]
2✔
515
    # if zero channel has negative expectation, assume LLH to be 0 there
516
    # this happens in the normalization factor calculation when mu is received from area
517
    neg_mu = mu < 0.0
2✔
518
    res[is_zero | neg_mu] = 0.0
2✔
519
    return res
2✔
520

521

522
# continuous and discrete binomial test
523

524

525
@numba.njit
2✔
526
def lbinom_pmf(k, n, p):
2✔
527
    """Log of binomial probability mass function approximated with gamma function."""
528
    scale_log = numba_gammaln(n + 1) - numba_gammaln(n - k + 1) - numba_gammaln(k + 1)
2✔
529
    ret_log = scale_log + k * np.log(p) + (n - k) * np.log(1 - p)
2✔
530
    return ret_log
2✔
531

532

533
@numba.njit
2✔
534
def binom_pmf(k, n, p):
2✔
535
    """Binomial probability mass function approximated with gamma function."""
536
    return np.exp(lbinom_pmf(k, n, p))
2✔
537

538

539
@numba.njit
2✔
540
def binom_cdf(k, n, p):
2✔
541
    if k >= n:
2✔
542
        return 1.0
2✔
543
    return numba_betainc(n - k, k + 1, 1.0 - p)
2✔
544

545

546
@numba.njit
2✔
547
def binom_sf(k, n, p):
2✔
548
    return 1 - binom_cdf(k, n, p)
2✔
549

550

551
@numba.njit
2✔
552
def lbinom_pmf_diriv(k, n, p, dk=1e-7):
2✔
553
    """Numerical dirivitive of Binomial pmf approximated with gamma function."""
554
    if k + dk < n:
2✔
555
        return (lbinom_pmf(k + dk, n, p) - lbinom_pmf(k, n, p)) / dk
2✔
556
    else:
557
        return (lbinom_pmf(k - dk, n, p) - lbinom_pmf(k, n, p)) / -dk
2✔
558

559

560
@numba.njit(cache=True)
2✔
561
def _numeric_derivative(y0, y1, err, target, x_min, x_max, x0, x1):
2✔
562
    """Get close to <target> by doing a numeric derivative."""
563
    if abs(y1 - y0) < err:
2✔
564
        # break by passing dx == 0
565
        return 0.0, x1, x1
2✔
566

567
    x = (target - y0) / (y1 - y0) * (x1 - x0) + x0
2✔
568
    x = min(x, x_max)
2✔
569
    x = max(x, x_min)
2✔
570

571
    dx = abs(x - x1)
2✔
572
    x0 = x1
2✔
573
    x1 = x
2✔
574

575
    return dx, x0, x1
2✔
576

577

578
@numba.njit
2✔
579
def lbinom_pmf_mode(x_min, x_max, target, args, err=1e-7, max_iter=50):
2✔
580
    """Find the root of the derivative of log Binomial pmf with secant method."""
581
    x0 = x_min
2✔
582
    x1 = x_max
2✔
583
    dx = abs(x1 - x0)
2✔
584

585
    while (dx > err) and (max_iter > 0):
2✔
586
        y0 = lbinom_pmf_diriv(x0, *args)
2✔
587
        y1 = lbinom_pmf_diriv(x1, *args)
2✔
588
        dx, x0, x1 = _numeric_derivative(y0, y1, err, target, x_min, x_max, x0, x1)
2✔
589
        max_iter -= 1
2✔
590
    return x1
2✔
591

592

593
@numba.njit
2✔
594
def lbinom_pmf_inverse(x_min, x_max, target, args, err=1e-7, max_iter=50):
2✔
595
    """Find the where the log Binomial pmf cross target with secant method."""
596
    x0 = x_min
2✔
597
    x1 = x_max
2✔
598
    dx = abs(x1 - x0)
2✔
599

600
    if dx != 0:
2✔
601
        while (dx > err) and (max_iter > 0):
2✔
602
            y0 = lbinom_pmf(x0, *args)
2✔
603
            y1 = lbinom_pmf(x1, *args)
2✔
604
            dx, x0, x1 = _numeric_derivative(y0, y1, err, target, x_min, x_max, x0, x1)
2✔
605
            max_iter -= 1
2✔
606
        if x0 == x1 == 0 and y0 - target > 0:
2✔
607
            x1 = np.nan
2✔
608
        if x0 == x1 == args[0] and y0 - target < 0:
2✔
609
            x1 = np.nan
×
610
    else:
611
        x1 = np.nan
2✔
612

613
    return x1
2✔
614

615

616
@numba.njit
2✔
617
def binom_test(k, n, p):
2✔
618
    """The main purpose of this algorithm is to find the value j on the other side of the mode that
619
    has the same probability as k, and integrate the tails outward from k and j.
620

621
    In the case where either k or j are zero, only the non-zero tail is integrated.
622

623
    """
624
    mode = lbinom_pmf_mode(0, n, 0, (n, p))
2✔
625
    distance = abs(mode - k)
2✔
626
    target = lbinom_pmf(k, n, p)
2✔
627

628
    if k < mode:
2✔
629
        j_min = mode
2✔
630
        j_max = min(mode + 2.0 * distance, n)
2✔
631
        j = lbinom_pmf_inverse(j_min, j_max, target, (n, p))
2✔
632
        ls, rs = k, j
2✔
633
    elif k > mode:
2✔
634
        j_min = max(mode - 2.0 * distance, 0)
2✔
635
        j_max = mode
2✔
636
        j = lbinom_pmf_inverse(j_min, j_max, target, (n, p))
2✔
637
        ls, rs = j, k
2✔
638
    else:
639
        return 1
2✔
640

641
    pval = 0
2✔
642
    if not np.isnan(ls):
2✔
643
        pval += binom_cdf(ls, n, p)
2✔
644
    if not np.isnan(rs):
2✔
645
        pval += binom_sf(rs, n, p)
2✔
646
        if np.isnan(ls):
2✔
647
            pval += binom_pmf(rs, n, p)
2✔
648

649
    return pval
2✔
650

651

652
@np.vectorize
2✔
653
@numba.njit
2✔
654
def s1_area_fraction_top_probability(aft_prob, area_tot, area_fraction_top, mode="continuous"):
2✔
655
    """Function to compute the S1 AFT probability."""
656
    area_top = area_tot * area_fraction_top
2✔
657

658
    # Raise a warning in case one of these three condition is verified
659
    # and return binomial test equal to nan since they are not physical
660
    # k: size_top, n: size_tot, p: aft_prob
661
    do_test = True
2✔
662
    if area_tot < area_top:
2✔
663
        # warnings.warn(f'n {area_tot} must be >= k {area_top}')
664
        binomial_test = np.nan
×
665
        do_test = False
×
666
    if (aft_prob > 1.0) or (aft_prob < 0.0):
2✔
667
        # warnings.warn(f'p {aft_prob} must be in range [0, 1]')
668
        binomial_test = np.nan
×
669
        do_test = False
×
670
    if area_top < 0:
2✔
671
        # warnings.warn(f'k {area_top} must be >= 0')
672
        binomial_test = np.nan
×
673
        do_test = False
×
674

675
    if do_test:
2✔
676
        if mode == "discrete":
2✔
677
            binomial_test = binom_pmf(area_top, area_tot, aft_prob)
2✔
678
            # TODO:
679
            # binomial_test = binomtest(k=round(area_top), n=round(area_tot), p=aft_prob, alternative='two-sided').pvalue
680
        else:
681
            binomial_test = binom_test(area_top, area_tot, aft_prob)
2✔
682

683
    return binomial_test
2✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc