• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

XENONnT / straxen / 11356161288

15 Oct 2024 11:48PM UTC coverage: 90.274% (+0.02%) from 90.253%
11356161288

Pull #1451

github

web-flow
Merge 78c74e3ce into df0d4887f
Pull Request #1451: Adjust saving preference

8994 of 9963 relevant lines covered (90.27%)

1.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.09
/straxen/plugins/events/event_pattern_fit.py
1
import strax
2✔
2
import straxen
2✔
3
import numpy as np
2✔
4
import numba
2✔
5
from straxen.numbafied_scipy import numba_gammaln, numba_betainc
2✔
6
from scipy.special import loggamma
2✔
7

8
export, __all__ = strax.exporter()
2✔
9

10

11
@export
2✔
12
class EventPatternFit(strax.Plugin):
2✔
13
    """Plugin that provides patter information for events."""
14

15
    depends_on = ("event_area_per_channel", "event_basics", "event_positions")
2✔
16
    save_when = strax.SaveWhen.ALWAYS
2✔
17
    provides = "event_pattern_fit"
2✔
18
    __version__ = "0.1.3"
2✔
19

20
    # Getting S1 AFT maps
21
    s1_aft_map = straxen.URLConfig(
2✔
22
        default=(
23
            "itp_map://resource://cmt://s1_aft_xyz_map?version=ONLINE&run_id=plugin.run_id&fmt=json"
24
        ),
25
        cache=True,
26
    )
27

28
    electron_drift_velocity = straxen.URLConfig(
2✔
29
        default="cmt://electron_drift_velocity?version=ONLINE&run_id=plugin.run_id",
30
        cache=True,
31
        help="Vertical electron drift velocity in cm/ns (1e4 m/ms)",
32
    )
33

34
    electron_drift_time_gate = straxen.URLConfig(
2✔
35
        default="cmt://electron_drift_time_gate?version=ONLINE&run_id=plugin.run_id",
36
        help="Electron drift time from the gate in ns",
37
        cache=True,
38
    )
39

40
    s1_optical_map = straxen.URLConfig(
2✔
41
        help="S1 (x, y, z) optical/pattern map.",
42
        infer_type=False,
43
        default=(
44
            "itp_map://"
45
            "resource://"
46
            "XENONnT_s1_xyz_patterns_corrected_qes_MCva43fa9b_wires.pkl"
47
            "?fmt=pkl"
48
        ),
49
    )
50

51
    s2_optical_map = straxen.URLConfig(
2✔
52
        help="S2 (x, y) optical/pattern map.",
53
        infer_type=False,
54
        default=(
55
            "itp_map://"
56
            "resource://"
57
            "XENONnT_s2_xy_patterns_LCE_corrected_qes_MCva43fa9b_wires.pkl"
58
            "?fmt=pkl"
59
        ),
60
    )
61

62
    s2_tf_model = straxen.URLConfig(
2✔
63
        help="S2 (x, y) optical data-driven model",
64
        infer_type=False,
65
        default=(
66
            "tf://"
67
            "resource://"
68
            "XENONnT_s2_optical_map_data_driven_ML_v0_2021_11_25.tar.gz"
69
            "?custom_objects=plugin.s2_map_custom_objects"
70
            "&fmt=abs_path"
71
        ),
72
    )
73

74
    mean_pe_per_photon = straxen.URLConfig(
2✔
75
        help="Mean of full VUV single photon response",
76
        default=1.2,
77
        infer_type=False,
78
    )
79

80
    gain_model = straxen.URLConfig(
2✔
81
        infer_type=False, help="PMT gain model. Specify as (model_type, model_config)"
82
    )
83

84
    n_tpc_pmts = straxen.URLConfig(type=int, help="Number of TPC PMTs")
2✔
85

86
    n_top_pmts = straxen.URLConfig(type=int, help="Number of top TPC PMTs")
2✔
87

88
    s1_min_area_pattern_fit = straxen.URLConfig(
2✔
89
        infer_type=False,
90
        help="Skip EventPatternFit reconstruction if S1 area (PE) is less than this",
91
        default=2,
92
    )
93

94
    s2_min_area_pattern_fit = straxen.URLConfig(
2✔
95
        infer_type=False,
96
        help="Skip EventPatternFit reconstruction if S2 area (PE) is less than this",
97
        default=10,
98
    )
99

100
    store_per_channel = straxen.URLConfig(
2✔
101
        default=False, type=bool, help="Store normalized LLH per channel for each peak"
102
    )
103

104
    max_r_pattern_fit = straxen.URLConfig(
2✔
105
        default=straxen.tpc_r,
106
        type=float,
107
        help="Maximal radius of the peaks where llh calculation will be performed",
108
    )
109

110
    def infer_dtype(self):
2✔
111
        dtype = [
2✔
112
            ("s2_2llh", np.float32, "Modified Poisson likelihood value for main S2 in the event"),
113
            (
114
                "s2_neural_2llh",
115
                np.float32,
116
                "Data-driven based likelihood value for main S2 in the event",
117
            ),
118
            ("alt_s2_2llh", np.float32, "Modified Poisson likelihood value for alternative S2"),
119
            (
120
                "alt_s2_neural_2llh",
121
                np.float32,
122
                "Data-driven based likelihood value for alternative S2 in the event",
123
            ),
124
            ("s1_2llh", np.float32, "Modified Poisson likelihood value for main S1"),
125
            (
126
                "s1_top_2llh",
127
                np.float32,
128
                "Modified Poisson likelihood value for main S1, calculated from top array",
129
            ),
130
            (
131
                "s1_bottom_2llh",
132
                np.float32,
133
                "Modified Poisson likelihood value for main S1, calculated from bottom array",
134
            ),
135
            (
136
                "s1_area_fraction_top_continuous_probability",
137
                np.float32,
138
                "Continuous binomial test for S1 area fraction top",
139
            ),
140
            (
141
                "s1_area_fraction_top_discrete_probability",
142
                np.float32,
143
                "Discrete binomial test for S1 area fraction top",
144
            ),
145
            (
146
                "s1_photon_fraction_top_continuous_probability",
147
                np.float32,
148
                "Continuous binomial test for S1 photon fraction top",
149
            ),
150
            (
151
                "s1_photon_fraction_top_discrete_probability",
152
                np.float32,
153
                "Discrete binomial test for S1 photon fraction top",
154
            ),
155
            (
156
                "alt_s1_area_fraction_top_continuous_probability",
157
                np.float32,
158
                "Continuous binomial test for alternative S1 area fraction top",
159
            ),
160
            (
161
                "alt_s1_area_fraction_top_discrete_probability",
162
                np.float32,
163
                "Discrete binomial test for alternative S1 area fraction top",
164
            ),
165
            (
166
                "alt_s1_photon_fraction_top_continuous_probability",
167
                np.float32,
168
                "Continuous binomial test for alternative S1 photon fraction top",
169
            ),
170
            (
171
                "alt_s1_photon_fraction_top_discrete_probability",
172
                np.float32,
173
                "Discrete binomial test for alternative S1 photon fraction top",
174
            ),
175
        ]
176

177
        if self.store_per_channel:
2✔
178
            dtype += [
2✔
179
                (
180
                    ("2LLH per channel for main S2", "s2_2llh_per_channel"),
181
                    np.float32,
182
                    (self.n_top_pmts,),
183
                ),
184
                (
185
                    ("2LLH per channel for alternative S2", "alt_s2_2llh_per_channel"),
186
                    np.float32,
187
                    (self.n_top_pmts,),
188
                ),
189
                (("Pattern main S2", "s2_pattern"), np.float32, (self.n_top_pmts,)),
190
                (("Pattern alt S2", "alt_s2_pattern"), np.float32, (self.n_top_pmts,)),
191
                (("Pattern for main S1", "s1_pattern"), np.float32, (self.n_tpc_pmts,)),
192
                (
193
                    ("2LLH per channel for main S1", "s1_2llh_per_channel"),
194
                    np.float32,
195
                    (self.n_tpc_pmts,),
196
                ),
197
            ]
198
        dtype += strax.time_fields
2✔
199
        return dtype
2✔
200

201
    @property
2✔
202
    def s2_map_custom_objects(self):
2✔
203
        def _logl_loss(patterns_true, likelihood):
2✔
204
            return likelihood / 10.0
×
205

206
        return {"_logl_loss": _logl_loss}
2✔
207

208
    def setup(self):
2✔
209
        # FIXME: Consider renaming the configs to match usage
210

211
        self.to_pe = self.gain_model
2✔
212

213
        self.mean_pe_photon = self.mean_pe_per_photon
2✔
214

215
        # Getting optical maps
216
        self.s1_pattern_map = self.s1_optical_map
2✔
217
        self.s2_pattern_map = self.s2_optical_map
2✔
218

219
        # Getting S2 data-driven tensorflow models
220
        self.model = self.s2_tf_model
2✔
221

222
        import tensorflow as tf
2✔
223

224
        self.model_chi2 = tf.keras.Model(
2✔
225
            self.model.inputs, self.model.get_layer("Likelihood").output
226
        )
227

228
        # Getting gain model to get dead PMTs
229
        self.dead_PMTs = np.where(self.to_pe == 0)[0]
2✔
230
        self.pmtbool = ~np.in1d(np.arange(0, self.n_tpc_pmts), self.dead_PMTs)
2✔
231
        self.pmtbool_top = self.pmtbool[: self.n_top_pmts]
2✔
232
        self.pmtbool_bottom = self.pmtbool[self.n_top_pmts : self.n_tpc_pmts]
2✔
233

234
    def compute(self, events):
2✔
235
        result = np.zeros(len(events), dtype=self.dtype)
2✔
236
        result["time"] = events["time"]
2✔
237
        result["endtime"] = strax.endtime(events)
2✔
238

239
        # Computing LLH values for S1s
240
        self.compute_s1_llhvalue(events, result)
2✔
241

242
        # Computing LLH values for S2s
243
        self.compute_s2_llhvalue(events, result)
2✔
244

245
        # Computing chi2 values for S2s
246
        self.compute_s2_neural_llhvalue(events, result)
2✔
247

248
        # Computing binomial test for s1 area fraction top
249
        positions = np.vstack([events["x"], events["y"], events["z"]]).T
2✔
250
        aft_prob = self.s1_aft_map(positions)
2✔
251

252
        alt_s1_interaction_drift_time = events["s2_center_time"] - events["alt_s1_center_time"]
2✔
253
        alt_s1_interaction_z = -self.electron_drift_velocity * (
2✔
254
            alt_s1_interaction_drift_time - self.electron_drift_time_gate
255
        )
256
        alt_positions = np.vstack([events["x"], events["y"], alt_s1_interaction_z]).T
2✔
257
        alt_aft_prob = self.s1_aft_map(alt_positions)
2✔
258

259
        # main s1 events
260
        mask_s1 = ~np.isnan(aft_prob)
2✔
261
        mask_s1 &= ~np.isnan(events["s1_area"])
2✔
262
        mask_s1 &= ~np.isnan(events["s1_area_fraction_top"])
2✔
263

264
        # default value is nan, it will be overwrite if the event satisfy the requirements
265
        result["s1_area_fraction_top_continuous_probability"][:] = np.nan
2✔
266
        result["s1_area_fraction_top_discrete_probability"][:] = np.nan
2✔
267
        result["s1_photon_fraction_top_continuous_probability"][:] = np.nan
2✔
268
        result["s1_photon_fraction_top_discrete_probability"][:] = np.nan
2✔
269

270
        # compute binomial test only if we have events that have valid aft prob, s1 area and s1 aft
271
        if np.sum(mask_s1):
2✔
272
            arg = (
2✔
273
                aft_prob[mask_s1],
274
                events["s1_area"][mask_s1],
275
                events["s1_area_fraction_top"][mask_s1],
276
            )
277
            result["s1_area_fraction_top_continuous_probability"][mask_s1] = (
2✔
278
                s1_area_fraction_top_probability(*arg)
279
            )
280
            result["s1_area_fraction_top_discrete_probability"][mask_s1] = (
2✔
281
                s1_area_fraction_top_probability(*arg, "discrete")
282
            )
283
            arg = (
2✔
284
                aft_prob[mask_s1],
285
                events["s1_area"][mask_s1] / self.mean_pe_per_photon,
286
                events["s1_area_fraction_top"][mask_s1],
287
            )
288
            result["s1_photon_fraction_top_continuous_probability"][mask_s1] = (
2✔
289
                s1_area_fraction_top_probability(*arg)
290
            )
291
            result["s1_photon_fraction_top_discrete_probability"][mask_s1] = (
2✔
292
                s1_area_fraction_top_probability(*arg, "discrete")
293
            )
294

295
        # alternative s1 events
296
        mask_alt_s1 = ~np.isnan(alt_aft_prob)
2✔
297
        mask_alt_s1 &= ~np.isnan(events["alt_s1_area"])
2✔
298
        mask_alt_s1 &= ~np.isnan(events["alt_s1_area_fraction_top"])
2✔
299

300
        # default value is nan, it will be ovewrite if the event satisfy the requirments
301
        result["alt_s1_area_fraction_top_continuous_probability"][:] = np.nan
2✔
302
        result["alt_s1_area_fraction_top_discrete_probability"][:] = np.nan
2✔
303
        result["alt_s1_photon_fraction_top_continuous_probability"][:] = np.nan
2✔
304
        result["alt_s1_photon_fraction_top_discrete_probability"][:] = np.nan
2✔
305

306
        # compute binomial test only if we have events that have valid aft prob,
307
        # alt s1 area and alt s1 aft
308
        if np.sum(mask_alt_s1):
2✔
309
            arg = (
×
310
                alt_aft_prob[mask_alt_s1],
311
                events["alt_s1_area"][mask_alt_s1],
312
                events["alt_s1_area_fraction_top"][mask_alt_s1],
313
            )
314
            result["alt_s1_area_fraction_top_continuous_probability"][mask_alt_s1] = (
×
315
                s1_area_fraction_top_probability(*arg)
316
            )
317
            result["alt_s1_area_fraction_top_discrete_probability"][mask_alt_s1] = (
×
318
                s1_area_fraction_top_probability(*arg, "discrete")
319
            )
320
            arg = (
×
321
                alt_aft_prob[mask_alt_s1],
322
                events["alt_s1_area"][mask_alt_s1] / self.mean_pe_per_photon,
323
                events["alt_s1_area_fraction_top"][mask_alt_s1],
324
            )
325
            result["alt_s1_photon_fraction_top_continuous_probability"][mask_alt_s1] = (
×
326
                s1_area_fraction_top_probability(*arg)
327
            )
328
            result["alt_s1_photon_fraction_top_discrete_probability"][mask_alt_s1] = (
×
329
                s1_area_fraction_top_probability(*arg, "discrete")
330
            )
331

332
        return result
2✔
333

334
    def compute_s1_llhvalue(self, events, result):
2✔
335
        # Selecting S1s for pattern fit calculation
336
        # - must exist (index != -1)
337
        # - must have total area larger minimal one
338
        # - must have positive AFT
339
        x, y, z = events["x"], events["y"], events["z"]
2✔
340
        cur_s1_bool = events["s1_area"] > self.s1_min_area_pattern_fit
2✔
341
        cur_s1_bool &= events["s1_index"] != -1
2✔
342
        cur_s1_bool &= events["s1_area_fraction_top"] >= 0
2✔
343
        cur_s1_bool &= np.isfinite(x)
2✔
344
        cur_s1_bool &= np.isfinite(y)
2✔
345
        cur_s1_bool &= np.isfinite(z)
2✔
346
        cur_s1_bool &= (x**2 + y**2) < self.max_r_pattern_fit**2
2✔
347

348
        # default value is nan, it will be ovewrite if the event satisfy the requirments
349
        result["s1_2llh"][:] = np.nan
2✔
350
        result["s1_top_2llh"][:] = np.nan
2✔
351
        result["s1_bottom_2llh"][:] = np.nan
2✔
352

353
        # Making expectation patterns [ in PE ]
354
        if np.sum(cur_s1_bool):
2✔
355
            s1_map_effs = self.s1_pattern_map(np.array([x, y, z]).T)[cur_s1_bool, :]
2✔
356
            s1_area = events["s1_area"][cur_s1_bool]
2✔
357
            s1_pattern = (
2✔
358
                s1_area[:, None]
359
                * (s1_map_effs[:, self.pmtbool])
360
                / np.sum(s1_map_effs[:, self.pmtbool], axis=1)[:, None]
361
            )
362

363
            s1_pattern_top = events["s1_area_fraction_top"][cur_s1_bool] * s1_area
2✔
364
            s1_pattern_top = s1_pattern_top[:, None] * (
2✔
365
                (s1_map_effs[:, : self.n_top_pmts])[:, self.pmtbool_top]
366
            )
367
            s1_pattern_top /= np.sum(
2✔
368
                (s1_map_effs[:, : self.n_top_pmts])[:, self.pmtbool_top], axis=1
369
            )[:, None]
370
            s1_pattern_bottom = (1 - events["s1_area_fraction_top"][cur_s1_bool]) * s1_area
2✔
371
            s1_pattern_bottom = s1_pattern_bottom[:, None] * (
2✔
372
                (s1_map_effs[:, self.n_top_pmts :])[:, self.pmtbool_bottom]
373
            )
374
            s1_pattern_bottom /= np.sum(
2✔
375
                (s1_map_effs[:, self.n_top_pmts :])[:, self.pmtbool_bottom], axis=1
376
            )[:, None]
377

378
            # Getting pattern from data
379
            s1_area_per_channel_ = events["s1_area_per_channel"][cur_s1_bool, :]
2✔
380
            s1_area_per_channel = s1_area_per_channel_[:, self.pmtbool]
2✔
381
            s1_area_per_channel_top = (s1_area_per_channel_[:, : self.n_top_pmts])[
2✔
382
                :, self.pmtbool_top
383
            ]
384
            s1_area_per_channel_bottom = (s1_area_per_channel_[:, self.n_top_pmts :])[
2✔
385
                :, self.pmtbool_bottom
386
            ]
387

388
            # Top and bottom
389
            arg1 = s1_pattern / self.mean_pe_photon, s1_area_per_channel, self.mean_pe_photon
2✔
390
            arg2 = (
2✔
391
                s1_area_per_channel / self.mean_pe_photon,
392
                s1_area_per_channel,
393
                self.mean_pe_photon,
394
            )
395
            norm_llh_val = neg2llh_modpoisson(*arg1) - neg2llh_modpoisson(*arg2)
2✔
396
            result["s1_2llh"][cur_s1_bool] = np.sum(norm_llh_val, axis=1)
2✔
397

398
            # If needed to stire - store only top and bottom array, but not together
399
            if self.store_per_channel:
2✔
400
                # Storring pattern information
401
                store_patterns = np.zeros((s1_pattern.shape[0], self.n_tpc_pmts))
2✔
402
                store_patterns[:, self.pmtbool] = s1_pattern
2✔
403
                result["s1_pattern"][cur_s1_bool] = store_patterns
2✔
404
                # Storing actual LLH values
405
                store_2LLH_ch = np.zeros((norm_llh_val.shape[0], self.n_tpc_pmts))
2✔
406
                store_2LLH_ch[:, self.pmtbool] = norm_llh_val
2✔
407
                result["s1_2llh_per_channel"][cur_s1_bool] = store_2LLH_ch
2✔
408

409
            # Top
410
            arg1 = (
2✔
411
                s1_pattern_top / self.mean_pe_photon,
412
                s1_area_per_channel_top,
413
                self.mean_pe_photon,
414
            )
415
            arg2 = (
2✔
416
                s1_area_per_channel_top / self.mean_pe_photon,
417
                s1_area_per_channel_top,
418
                self.mean_pe_photon,
419
            )
420
            norm_llh_val = neg2llh_modpoisson(*arg1) - neg2llh_modpoisson(*arg2)
2✔
421
            result["s1_top_2llh"][cur_s1_bool] = np.sum(norm_llh_val, axis=1)
2✔
422

423
            # Bottom
424
            arg1 = (
2✔
425
                s1_pattern_bottom / self.mean_pe_photon,
426
                s1_area_per_channel_bottom,
427
                self.mean_pe_photon,
428
            )
429
            arg2 = (
2✔
430
                s1_area_per_channel_bottom / self.mean_pe_photon,
431
                s1_area_per_channel_bottom,
432
                self.mean_pe_photon,
433
            )
434
            norm_llh_val = neg2llh_modpoisson(*arg1) - neg2llh_modpoisson(*arg2)
2✔
435
            result["s1_bottom_2llh"][cur_s1_bool] = np.sum(norm_llh_val, axis=1)
2✔
436

437
    def compute_s2_llhvalue(self, events, result):
2✔
438
        for t_ in ["s2", "alt_s2"]:
2✔
439
            # Selecting S2s for pattern fit calculation
440
            # - must exist (index != -1)
441
            # - must have total area larger minimal one
442
            # - must have positive AFT
443
            x, y = events[t_ + "_x"], events[t_ + "_y"]
2✔
444
            s2_mask = events[t_ + "_area"] > self.s2_min_area_pattern_fit
2✔
445
            s2_mask &= events[t_ + "_area_fraction_top"] > 0
2✔
446
            s2_mask &= (x**2 + y**2) < self.max_r_pattern_fit**2
2✔
447

448
            # default value is nan, it will be ovewrite if the event satisfy the requirments
449
            result[t_ + "_2llh"][:] = np.nan
2✔
450

451
            # Making expectation patterns [ in PE ]
452
            if np.sum(s2_mask):
2✔
453
                s2_map_effs = self.s2_pattern_map(np.array([x, y]).T)[s2_mask, 0 : self.n_top_pmts]
2✔
454
                s2_map_effs = s2_map_effs[:, self.pmtbool_top]
2✔
455
                s2_top_area = (events[t_ + "_area_fraction_top"] * events[t_ + "_area"])[s2_mask]
2✔
456
                s2_pattern = (
2✔
457
                    s2_top_area[:, None] * s2_map_effs / np.sum(s2_map_effs, axis=1)[:, None]
458
                )
459

460
                # Getting pattern from data
461
                s2_top_area_per_channel = events[t_ + "_area_per_channel"][
2✔
462
                    s2_mask, 0 : self.n_top_pmts
463
                ]
464
                s2_top_area_per_channel = s2_top_area_per_channel[:, self.pmtbool_top]
2✔
465

466
                # Calculating LLH, this is shifted Poisson
467
                # we get area expectation and we need to scale them to get
468
                # photon expectation
469
                norm_llh_val = neg2llh_modpoisson(
2✔
470
                    mu=s2_pattern / self.mean_pe_photon,
471
                    areas=s2_top_area_per_channel,
472
                    mean_pe_photon=self.mean_pe_photon,
473
                ) - neg2llh_modpoisson(
474
                    mu=s2_top_area_per_channel / self.mean_pe_photon,
475
                    areas=s2_top_area_per_channel,
476
                    mean_pe_photon=self.mean_pe_photon,
477
                )
478
                result[t_ + "_2llh"][s2_mask] = np.sum(norm_llh_val, axis=1)
2✔
479

480
                if self.store_per_channel:
2✔
481
                    store_patterns = np.zeros((s2_pattern.shape[0], self.n_top_pmts))
2✔
482
                    store_patterns[:, self.pmtbool_top] = s2_pattern
2✔
483
                    result[t_ + "_pattern"][s2_mask] = store_patterns
2✔
484

485
                    store_2LLH_ch = np.zeros((norm_llh_val.shape[0], self.n_top_pmts))
2✔
486
                    store_2LLH_ch[:, self.pmtbool_top] = norm_llh_val
2✔
487
                    result[t_ + "_2llh_per_channel"][s2_mask] = store_2LLH_ch
2✔
488

489
    def compute_s2_neural_llhvalue(self, events, result):
2✔
490
        for t_ in ["s2", "alt_s2"]:
2✔
491
            x, y = events[t_ + "_x"], events[t_ + "_y"]
2✔
492
            s2_mask = events[t_ + "_area"] > self.s2_min_area_pattern_fit
2✔
493
            s2_mask &= events[t_ + "_area_fraction_top"] > 0
2✔
494

495
            # default value is nan, it will be ovewrite if the event satisfy the requirements
496
            result[t_ + "_neural_2llh"][:] = np.nan
2✔
497

498
            # Produce position and top pattern to feed tensorflow model, return chi2/N
499
            if np.sum(s2_mask):
2✔
500
                s2_pos = np.stack((x, y)).T[s2_mask]
2✔
501
                s2_pat = events[t_ + "_area_per_channel"][s2_mask, 0 : self.n_top_pmts]
2✔
502
                # Output[0]: loss function, -2*log-likelihood, Output[1]: chi2
503
                result[t_ + "_neural_2llh"][s2_mask] = self.model_chi2.predict(
2✔
504
                    {"xx": s2_pos, "yy": s2_pat}, verbose=0
505
                )[1]
506

507

508
def neg2llh_modpoisson(mu=None, areas=None, mean_pe_photon=1.0):
2✔
509
    """Modified poisson distribution with proper normalization for shifted poisson.
510

511
    mu - expected number of photons per channel
512
    areas - observed areas per channel
513
    mean_pe_photon - mean of area responce for one photon
514

515
    """
516
    with np.errstate(divide="ignore", invalid="ignore"):
2✔
517
        fraction = areas / mean_pe_photon
2✔
518
        res = 2.0 * (
2✔
519
            mu - (fraction) * np.log(mu) + loggamma((fraction) + 1) + np.log(mean_pe_photon)
520
        )
521
    is_zero = areas <= 0  # If area equals or smaller than 0 - assume 0
2✔
522
    res[is_zero] = 2.0 * mu[is_zero]
2✔
523
    # if zero channel has negative expectation, assume LLH to be 0 there
524
    # this happens in the normalization factor calculation when mu is received from area
525
    neg_mu = mu < 0.0
2✔
526
    res[is_zero | neg_mu] = 0.0
2✔
527
    return res
2✔
528

529

530
# continuous and discrete binomial test
531

532

533
@numba.njit
2✔
534
def lbinom_pmf(k, n, p):
2✔
535
    """Log of binomial probability mass function approximated with gamma function."""
536
    scale_log = numba_gammaln(n + 1) - numba_gammaln(n - k + 1) - numba_gammaln(k + 1)
2✔
537
    ret_log = scale_log + k * np.log(p) + (n - k) * np.log(1 - p)
2✔
538
    return ret_log
2✔
539

540

541
@numba.njit
2✔
542
def binom_pmf(k, n, p):
2✔
543
    """Binomial probability mass function approximated with gamma function."""
544
    return np.exp(lbinom_pmf(k, n, p))
2✔
545

546

547
@numba.njit
2✔
548
def binom_cdf(k, n, p):
2✔
549
    if k >= n:
2✔
550
        return 1.0
2✔
551
    return numba_betainc(n - k, k + 1, 1.0 - p)
2✔
552

553

554
@numba.njit
2✔
555
def binom_sf(k, n, p):
2✔
556
    return 1 - binom_cdf(k, n, p)
2✔
557

558

559
@numba.njit
2✔
560
def lbinom_pmf_diriv(k, n, p, dk=1e-7):
2✔
561
    """Numerical dirivitive of Binomial pmf approximated with gamma function."""
562
    if k + dk < n:
2✔
563
        return (lbinom_pmf(k + dk, n, p) - lbinom_pmf(k, n, p)) / dk
2✔
564
    else:
565
        return (lbinom_pmf(k - dk, n, p) - lbinom_pmf(k, n, p)) / -dk
2✔
566

567

568
@numba.njit(cache=True)
2✔
569
def _numeric_derivative(y0, y1, err, target, x_min, x_max, x0, x1):
2✔
570
    """Get close to <target> by doing a numeric derivative."""
571
    if abs(y1 - y0) < err:
2✔
572
        # break by passing dx == 0
573
        if abs(y0 - target) < abs(y1 - target):
2✔
574
            x = x0
2✔
575
        else:
576
            x = x1
2✔
577
        return 0.0, x, x
2✔
578

579
    x = (target - y0) / (y1 - y0) * (x1 - x0) + x0
2✔
580
    x = min(x, x_max)
2✔
581
    x = max(x, x_min)
2✔
582

583
    dx = abs(x - x1)
2✔
584
    x0 = x1
2✔
585
    x1 = x
2✔
586

587
    return dx, x0, x1
2✔
588

589

590
@numba.njit
2✔
591
def lbinom_pmf_mode(x_min, x_max, target, args, err=1e-7, max_iter=50):
2✔
592
    """Find the root of the derivative of log Binomial pmf with secant method."""
593
    x0 = x_min
2✔
594
    x1 = x_max
2✔
595
    dx = abs(x1 - x0)
2✔
596

597
    while (dx > err) and (max_iter > 0):
2✔
598
        y0 = lbinom_pmf_diriv(x0, *args)
2✔
599
        y1 = lbinom_pmf_diriv(x1, *args)
2✔
600
        dx, x0, x1 = _numeric_derivative(y0, y1, err, target, x_min, x_max, x0, x1)
2✔
601
        max_iter -= 1
2✔
602
    return x1
2✔
603

604

605
@numba.njit
2✔
606
def lbinom_pmf_inverse(x_min, x_max, target, args, err=1e-7, max_iter=50):
2✔
607
    """Find the where the log Binomial pmf cross target with secant method."""
608
    x0 = x_min
2✔
609
    x1 = x_max
2✔
610
    dx = abs(x1 - x0)
2✔
611

612
    if dx != 0:
2✔
613
        while (dx > err) and (max_iter > 0):
2✔
614
            y0 = lbinom_pmf(x0, *args)
2✔
615
            y1 = lbinom_pmf(x1, *args)
2✔
616
            dx, x0, x1 = _numeric_derivative(y0, y1, err, target, x_min, x_max, x0, x1)
2✔
617
            max_iter -= 1
2✔
618
        if x0 == x1 == 0 and y0 - target > 0:
2✔
619
            x1 = np.nan
2✔
620
        if x0 == x1 == args[0] and y0 - target < 0:
2✔
621
            x1 = np.nan
×
622
    else:
623
        x1 = np.nan
2✔
624

625
    return x1
2✔
626

627

628
@numba.njit
2✔
629
def binom_test(k, n, p):
2✔
630
    """The main purpose of this algorithm is to find the value j on the other side of the mode that
631
    has the same probability as k, and integrate the tails outward from k and j.
632

633
    In the case where either k or j are zero, only the non-zero tail is integrated.
634

635
    """
636
    mode = lbinom_pmf_mode(0, n, 0, (n, p))
2✔
637
    distance = abs(mode - k)
2✔
638
    target = lbinom_pmf(k, n, p)
2✔
639

640
    if k < mode:
2✔
641
        j_min = mode
2✔
642
        j_max = min(mode + 2.0 * distance, n)
2✔
643
        j = lbinom_pmf_inverse(j_min, j_max, target, (n, p))
2✔
644
        ls, rs = k, j
2✔
645
    elif k > mode:
2✔
646
        j_min = max(mode - 2.0 * distance, 0)
2✔
647
        j_max = mode
2✔
648
        j = lbinom_pmf_inverse(j_min, j_max, target, (n, p))
2✔
649
        ls, rs = j, k
2✔
650
    else:
651
        return 1
2✔
652

653
    pval = 0
2✔
654
    if not np.isnan(ls):
2✔
655
        pval += binom_cdf(ls, n, p)
2✔
656
    if not np.isnan(rs):
2✔
657
        pval += binom_sf(rs, n, p)
2✔
658
        if np.isnan(ls):
2✔
659
            pval += binom_pmf(rs, n, p)
2✔
660

661
    return pval
2✔
662

663

664
@np.vectorize
2✔
665
@numba.njit
2✔
666
def s1_area_fraction_top_probability(aft_prob, area_tot, area_fraction_top, mode="continuous"):
2✔
667
    """Function to compute the S1 AFT probability."""
668
    area_top = area_tot * area_fraction_top
2✔
669

670
    # Raise a warning in case one of these three condition is verified
671
    # and return binomial test equal to nan since they are not physical
672
    # k: size_top, n: size_tot, p: aft_prob
673
    do_test = True
2✔
674
    if area_tot < area_top:
2✔
675
        # warnings.warn(f'n {area_tot} must be >= k {area_top}')
676
        binomial_test = np.nan
×
677
        do_test = False
×
678
    if (aft_prob > 1.0) or (aft_prob < 0.0):
2✔
679
        # warnings.warn(f'p {aft_prob} must be in range [0, 1]')
680
        binomial_test = np.nan
×
681
        do_test = False
×
682
    if area_top < 0:
2✔
683
        # warnings.warn(f'k {area_top} must be >= 0')
684
        binomial_test = np.nan
×
685
        do_test = False
×
686

687
    if do_test:
2✔
688
        if mode == "discrete":
2✔
689
            binomial_test = binom_pmf(area_top, area_tot, aft_prob)
2✔
690
            # TODO:
691
            # binomial_test = binomtest(
692
            #     k=round(area_top), n=round(area_tot), p=aft_prob, alternative="two-sided"
693
            # ).pvalue
694
        else:
695
            binomial_test = binom_test(area_top, area_tot, aft_prob)
2✔
696

697
    return binomial_test
2✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc