• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

XENONnT / straxen / 10168216310

30 Jul 2024 07:02PM UTC coverage: 90.957% (-0.2%) from 91.135%
10168216310

Pull #1401

github

web-flow
Merge d7f13cc2a into 4d6c2b7ba
Pull Request #1401: Dynamic led window

50 of 69 new or added lines in 2 files covered. (72.46%)

6 existing lines in 2 files now uncovered.

8992 of 9886 relevant lines covered (90.96%)

1.82 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.88
/straxen/plugins/events_nv/events_nv.py
1
import strax
2✔
2
import straxen
2✔
3
import numpy as np
2✔
4
import numba
2✔
5
import typing as ty
2✔
6
from immutabledict import immutabledict
2✔
7

8
export, __all__ = strax.exporter()
2✔
9

10

11
@export
2✔
12
class nVETOEvents(strax.OverlapWindowPlugin):
2✔
13
    """Plugin which computes the boundaries of veto events."""
14

15
    __version__ = "0.0.3"
2✔
16

17
    depends_on = "hitlets_nv"
2✔
18
    provides = "events_nv"
2✔
19
    data_kind = "events_nv"
2✔
20
    compressor = "zstd"
2✔
21
    events_seen = 0
2✔
22

23
    event_left_extension_nv = straxen.URLConfig(
2✔
24
        default=0, track=True, type=int, help="Extends event window this many [ns] to the left."
25
    )
26

27
    event_resolving_time_nv = straxen.URLConfig(
2✔
28
        default=200, track=True, type=int, help="Resolving time for window coincidence [ns]."
29
    )
30

31
    event_min_hits_nv = straxen.URLConfig(
2✔
32
        default=3,
33
        track=True,
34
        type=int,
35
        help="Minimum number of fully confined hitlets to define an event.",
36
    )
37

38
    channel_map = straxen.URLConfig(
2✔
39
        track=False,
40
        type=immutabledict,
41
        help="immutabledict mapping subdetector to (min, max) channel number",
42
    )
43

44
    def infer_dtype(self):
2✔
45
        self.name_event_number = "event_number_nv"
2✔
46
        self.channel_range = self.channel_map["nveto"]
2✔
47
        self.n_channel = (self.channel_range[1] - self.channel_range[0]) + 1
2✔
48
        return veto_event_dtype(self.name_event_number, self.n_channel)
2✔
49

50
    def get_window_size(self):
2✔
51
        return self.event_left_extension_nv + self.event_resolving_time_nv + 1
2✔
52

53
    def compute(self, hitlets_nv, start, end):
2✔
54
        events, hitlets_ids_in_event = find_veto_events(
2✔
55
            hitlets_nv,
56
            self.event_min_hits_nv,
57
            self.event_resolving_time_nv,
58
            self.event_left_extension_nv,
59
            event_number_key=self.name_event_number,
60
            n_channel=self.n_channel,
61
        )
62

63
        if len(hitlets_ids_in_event):
2✔
64
            compute_nveto_event_properties(
2✔
65
                events, hitlets_nv, hitlets_ids_in_event, start_channel=self.channel_range[0]
66
            )
67

68
        # Get eventids:
69
        n_events = len(events)
2✔
70
        events[self.name_event_number] = np.arange(n_events) + self.events_seen
2✔
71
        self.events_seen += n_events
2✔
72

73
        # Don't extend beyond the chunk boundaries
74
        # This will often happen for events near the invalid boundary of the
75
        # overlap processing (which should be thrown away)
76
        events["time"] = np.clip(events["time"], start, end)
2✔
77
        events["endtime"] = np.clip(events["endtime"], start, end)
2✔
78
        return events
2✔
79

80

81
def veto_event_dtype(name_event_number: str = "event_number_nv", n_pmts: int = 120) -> list:
2✔
82
    dtype = []
2✔
83
    dtype += strax.time_fields  # because mutable
2✔
84
    dtype += [
2✔
85
        (("Veto event number in this dataset", name_event_number), np.int64),
86
        (("Total area of all hitlets in event [pe]", "area"), np.float32),
87
        (("Total number of hitlets in events", "n_hits"), np.int32),
88
        (("Total number of contributing channels", "n_contributing_pmt"), np.uint8),
89
        (("Area in event per channel [pe]", "area_per_channel"), np.float32, n_pmts),
90
        (
91
            (
92
                "Area weighted mean time of the event relative to the event start [ns]",
93
                "center_time",
94
            ),
95
            np.float32,
96
        ),
97
        (("Weighted variance of time [ns]", "center_time_spread"), np.float32),
98
    ]
99
    return dtype
2✔
100

101

102
@numba.njit(cache=True, nogil=True)
2✔
103
def compute_nveto_event_properties(
2✔
104
    events: np.ndarray,
105
    hitlets: np.ndarray,
106
    contained_hitlets_ids: np.ndarray,
107
    start_channel: int = 2000,
108
):
109
    """Computes properties of the neutron-veto events. Writes results directly to events.
110

111
    :param events: Events for which properties should be computed
112
    :param hitlets: hitlets which were used to build the events.
113
    :param contained_hitlets_ids: numpy array of the shape n x 2 which holds the indices of the
114
        hitlets contained in the corresponding event.
115
    :param start_channel: Integer specifying start channel, e.g. 2000 for nveto.
116

117
    """
118
    for e, (s_i, e_i) in zip(events, contained_hitlets_ids):
2✔
119
        hitlet = hitlets[s_i:e_i]
2✔
120
        event_area = np.sum(hitlet["area"])
2✔
121
        e["area"] = event_area
2✔
122
        e["n_hits"] = len(hitlet)
2✔
123
        e["n_contributing_pmt"] = len(np.unique(hitlet["channel"]))
2✔
124

125
        t = hitlet["time"] - hitlet[0]["time"]
2✔
126
        if event_area:
2✔
127
            e["center_time"] = np.sum(t * hitlet["area"]) / event_area
2✔
128
            if e["n_hits"] > 1 and e["center_time"]:
2✔
129
                w = hitlet["area"] / e["area"]  # normalized weights
2✔
130
                # Definition of variance
131
                e["center_time_spread"] = np.sqrt(
2✔
132
                    np.sum(w * np.power(t - e["center_time"], 2)) / np.sum(w)
133
                )
134
            else:
UNCOV
135
                e["center_time_spread"] = np.inf
1✔
136

137
        # Compute per channel properties:
138
        for hit in hitlet:
2✔
139
            ch = hit["channel"] - start_channel
2✔
140
            e["area_per_channel"][ch] += hit["area"]
2✔
141

142

143
@export
2✔
144
def find_veto_events(
2✔
145
    hitlets: np.ndarray,
146
    coincidence_level: int,
147
    resolving_time: int,
148
    left_extension: int,
149
    event_number_key: str = "event_number_nv",
150
    n_channel: int = 120,
151
) -> ty.Tuple[np.ndarray, np.ndarray]:
152
    """Function which find the veto events as a nfold concidence in a given resolving time window.
153
    All hitlets which touch the event window contribute.
154

155
    :param hitlets: Hitlets which shall be used for event creation.
156
    :param coincidence_level: int, coincidence level.
157
    :param resolving_time: int, resolving window for coincidence in ns.
158
    :param left_extension: int, left event extension in ns.
159
    :param event_number_key: str, field name for the event number
160
    :param n_channel: int, number of channels in detector.
161
    :return: events, hitelt_ids_per_event
162

163
    """
164
    # Find intervals which satisfy requirement:
165
    event_intervals = straxen.plugins.nveto_recorder.find_coincidence(
2✔
166
        hitlets,
167
        coincidence_level,
168
        resolving_time,
169
        left_extension,
170
    )
171

172
    # Find all hitlets which touch the coincidence windows:
173
    # (we cannot use fully_contained in here since some muon signals
174
    # may be larger than 300 ns)
175
    hitlets_ids_in_event = strax.touching_windows(hitlets, event_intervals)
2✔
176

177
    # For some rare cases long signals may touch two intervals, in that
178
    # case we merge the intervals in the subsequent function:
179
    hitlets_ids_in_event = _solve_ambiguity(hitlets_ids_in_event)
2✔
180

181
    # Now we can create the veto events:
182
    events = np.zeros(
2✔
183
        len(hitlets_ids_in_event), dtype=veto_event_dtype(event_number_key, n_channel)
184
    )
185
    _make_event(hitlets, hitlets_ids_in_event, events)
2✔
186
    return events, hitlets_ids_in_event
2✔
187

188

189
@numba.njit(cache=True, nogil=False)
2✔
190
def _solve_ambiguity(contained_hitlets_ids: np.ndarray) -> np.ndarray:
2✔
191
    """Function which solves the ambiguity if a single hitlets overlaps with two event intervals.
192

193
    This can happen for muon signals which have a long tail, since we define the coincidence window
194
    as a fixed window. Hence those tails can extend beyond the fixed window.
195

196
    """
197
    res = np.zeros(contained_hitlets_ids.shape, dtype=contained_hitlets_ids.dtype)
2✔
198

199
    if not len(res):
2✔
200
        # Return empty result
201
        return res
2✔
202

203
    offset = 0
2✔
204
    start_i, end_i = contained_hitlets_ids[0]
2✔
205
    for e_i, ids in enumerate(contained_hitlets_ids[1:]):
2✔
206
        if end_i > ids[0]:
2✔
207
            # Current and next interval overlap so just updated the end
208
            # index.
209
            end_i = ids[1]
×
210
        else:
211
            # They do not overlap store indices:
212
            res[offset] = [start_i, end_i]
2✔
213
            offset += 1
2✔
214
            # Init next interval:
215
            start_i, end_i = ids
2✔
216

217
    # Last event:
218
    res[offset, :] = [start_i, end_i]
2✔
219
    offset += 1
2✔
220
    return res[:offset]
2✔
221

222

223
@numba.njit(cache=True, nogil=True)
2✔
224
def _make_event(hitlets: np.ndarray, hitlet_ids: np.ndarray, res: np.ndarray):
2✔
225
    """Function which sets veto event time and endtime."""
226
    for ei, ids in enumerate(hitlet_ids):
2✔
227
        hit = hitlets[ids[0] : ids[1]]
2✔
228
        res[ei]["time"] = hit[0]["time"]
2✔
229
        res[ei]["endtime"] = np.max(strax.endtime(hit))
2✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc