• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

XENONnT / straxen / 10168216310

30 Jul 2024 07:02PM UTC coverage: 90.957% (-0.2%) from 91.135%
10168216310

Pull #1401

github

web-flow
Merge d7f13cc2a into 4d6c2b7ba
Pull Request #1401: Dynamic led window

50 of 69 new or added lines in 2 files covered. (72.46%)

6 existing lines in 2 files now uncovered.

8992 of 9886 relevant lines covered (90.96%)

1.82 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.46
/straxen/plugins/led_cal/led_calibration.py
1
"""
2
Dear nT analyser,
3
if you want to complain please contact:
4
    chiara@physik.uzh.ch,
5
    gvolta@physik.uzh.ch,
6
    kazama@isee.nagoya-u.ac.jp
7
    torben.flehmke@fysik.su.se
8
"""
9

10
from immutabledict import immutabledict
2✔
11
import strax
2✔
12
import straxen
2✔
13
import numba
2✔
14
import numpy as np
2✔
15

16
# This makes sure shorthands for only the necessary functions
17
# are made available under straxen.[...]
18
export, __all__ = strax.exporter()
2✔
19

20
channel_list = [i for i in range(494)]
2✔
21

22

23
@export
2✔
24
class LEDCalibration(strax.Plugin):
2✔
25
    """
26
    Preliminary version, several parameters to set during commissioning.
27
    LEDCalibration returns: channel, time, dt, length, Area,
28
    amplitudeLED and amplitudeNOISE.
29

30
    The new variables are:
31
        - Area: Area computed in the given window, averaged over 6
32
          windows that have the same starting sample and different end
33
          samples.
34
        - amplitudeLED: peak amplitude of the LED on run in the given
35
          window.
36
        - amplitudeNOISE: amplitude of the LED on run in a window far
37
          from the signal one.
38
    """
39

40
    __version__ = "0.3.0"
2✔
41

42
    depends_on = "raw_records"
2✔
43
    data_kind = "led_cal"
2✔
44
    compressor = "zstd"
2✔
45
    parallel = "process"
2✔
46
    rechunk_on_save = False
2✔
47

48
    baseline_window = straxen.URLConfig(
2✔
49
        default=(0, 40),
50
        infer_type=False,
51
        help="Window (samples) for baseline calculation.",
52
    )
53

54
    default_led_window = straxen.URLConfig(
2✔
55
        default=(78, 107),
56
        infer_type=False,
57
        help="Default window (samples) to integrate and get he maximum amplitude of if no hit was found in the record.",
58
    )
59

60
    led_hit_extension = straxen.URLConfig(
2✔
61
        default=(-5, 24),
62
        infer_type=False,
63
        help="The extension around the LED hit to integrate.",
64
    )
65

66
    noise_window = straxen.URLConfig(
2✔
67
        default=(10, 48), infer_type=False, help="Window (samples) to analyse the noise"
68
    )
69

70
    channel_list = straxen.URLConfig(
2✔
71
        default=(tuple(channel_list)),
72
        infer_type=False,
73
        help="List of PMTs. Defalt value: all the PMTs",
74
    )
75

76
    hit_min_amplitude = straxen.URLConfig(
2✔
77
        track=True,
78
        infer_type=False,
79
        default="cmt://hit_thresholds_tpc?version=ONLINE&run_id=plugin.run_id",
80
        help=(
81
            "Minimum hit amplitude in ADC counts above baseline. "
82
            "Specify as a tuple of length n_tpc_pmts, or a number,"
83
            'or a string like "legacy-thresholds://pmt_commissioning_initial" which means calling'
84
            "hitfinder_thresholds.py"
85
            'or url string like "cmt://hit_thresholds_tpc?version=ONLINE" which means'
86
            "calling cmt."
87
        ),
88
    )
89

90
    hit_min_height_over_noise = straxen.URLConfig(
2✔
91
        default=4,
92
        infer_type=False,
93
        help=(
94
            "Minimum hit amplitude in numbers of baseline_rms above baseline."
95
            "Actual threshold used is max(hit_min_amplitude, hit_min_"
96
            "height_over_noise * baseline_rms)."
97
        ),
98
    )
99

100
    dtype = [
2✔
101
        ("area", np.float32, "Area averaged in integration windows"),
102
        ("amplitude_led", np.float32, "Amplitude in LED window"),
103
        ("amplitude_noise", np.float32, "Amplitude in off LED window"),
104
        ("channel", np.int16, "Channel"),
105
        ("time", np.int64, "Start time of the interval (ns since unix epoch)"),
106
        ("dt", np.int16, "Time resolution in ns"),
107
        ("length", np.int32, "Length of the interval in samples"),
108
    ]
109

110
    def compute(self, raw_records):
2✔
111
        """The data for LED calibration are build for those PMT which belongs to channel list.
112

113
        This is used for the different ligh levels. As default value all the PMTs are considered.
114

115
        """
116
        mask = np.where(np.in1d(raw_records["channel"], self.channel_list))[0]
2✔
117
        rr = raw_records[mask]
2✔
118
        r = get_records(rr, baseline_window=self.baseline_window)
2✔
119
        del rr, raw_records
2✔
120

121
        temp = np.zeros(len(r), dtype=self.dtype)
2✔
122
        strax.copy_to_buffer(r, temp, "_recs_to_temp_led")
2✔
123

124
        led_windows = get_led_windows(
2✔
125
            r,
126
            self.default_led_window,
127
            self.led_hit_extension,
128
            self.hit_min_amplitude,
129
            self.hit_min_height_over_noise,
130
        )
131

132
        on, off = get_amplitude(r, led_windows, self.noise_window)
2✔
133
        temp["amplitude_led"] = on["amplitude"]
2✔
134
        temp["amplitude_noise"] = off["amplitude"]
2✔
135

136
        area = get_area(r, led_windows)
2✔
137
        temp["area"] = area["area"]
2✔
138
        return temp
2✔
139

140

141
def get_records(raw_records, baseline_window):
2✔
142
    """Determine baseline as the average of the first baseline_samples of each pulse.
143

144
    Subtract the pulse float(data) from baseline.
145

146
    """
147

148
    record_length = np.shape(raw_records.dtype["data"])[0]
2✔
149

150
    _dtype = [
2✔
151
        (("Start time since unix epoch [ns]", "time"), "<i8"),
152
        (("Length of the interval in samples", "length"), "<i4"),
153
        (("Width of one sample [ns]", "dt"), "<i2"),
154
        (("Channel/PMT number", "channel"), "<i2"),
155
        (
156
            (
157
                "Length of pulse to which the record belongs (without zero-padding)",
158
                "pulse_length",
159
            ),
160
            "<i4",
161
        ),
162
        (("Fragment number in the pulse", "record_i"), "<i2"),
163
        (
164
            ("Baseline in ADC counts. data = int(baseline) - data_orig", "baseline"),
165
            "f4",
166
        ),
167
        (
168
            ("Baseline RMS in ADC counts. data = baseline - data_orig", "baseline_rms"),
169
            "f4",
170
        ),
171
        (("Waveform data in raw ADC counts", "data"), "f4", (record_length,)),
172
    ]
173

174
    records = np.zeros(len(raw_records), dtype=_dtype)
2✔
175
    strax.copy_to_buffer(raw_records, records, "_rr_to_r_led")
2✔
176

177
    mask = np.where((records["record_i"] == 0) & (records["length"] == 160))[0]
2✔
178
    records = records[mask]
2✔
179
    bl = records["data"][:, baseline_window[0] : baseline_window[1]].mean(axis=1)
2✔
180
    rms = records["data"][:, baseline_window[0] : baseline_window[1]].std(axis=1)
2✔
181
    records["data"][:, :160] = (
2✔
182
        -1.0 * (records["data"][:, :160].transpose() - bl[:]).transpose()
183
    )
184
    records["baseline"] = bl
2✔
185
    records["baseline_rms"] = rms
2✔
186
    return records
2✔
187

188

189
def get_led_windows(
2✔
190
    records,
191
    default_window,
192
    led_hit_extension,
193
    hit_min_amplitude,
194
    hit_min_height_over_noise,
195
):
196
    """Search for hits in the records, if a hit is found, return an interval around the hit given by
197
    led_hit_extension. If no hit is found in the record, return the default window.
198

199
    :param records: Array of the records to search for LED hits.
200
    :param default_window: Default window to use if no LED hit is found given as a tuple (start,
201
        end)
202
    :param led_hit_extension: The integration window around the first hit found to use. A tuple of
203
        form (samples_before, samples_after) the first LED hit.
204
    :param hit_min_amplitude: Minimum amplitude of the signal to be considered a hit.
205
    :param hit_min_height_over_noise: Minimum height of the signal over noise to be considered a
206
        hit. :return (len(records), 2) array: Integration window for each record
207

208
    """
209
    hits = strax.find_hits(
2✔
210
        records,
211
        min_amplitude=hit_min_amplitude,
212
        min_height_over_noise=hit_min_height_over_noise,
213
    )
214
    default_windows = np.tile(default_window, (len(records), 1))
2✔
215
    return _get_led_windows(hits, default_windows, led_hit_extension)
2✔
216

217

218
@numba.jit(nopython=True)
2✔
219
def _get_led_windows(hits, default_windows, led_hit_extension):
2✔
220
    windows = default_windows
2✔
221
    last = -1
2✔
222
    for hit in hits:
2✔
NEW
223
        if hit["record_i"] == last:
×
NEW
224
            continue  # If there are multiple hits in one record, ignore after the first
×
225

NEW
226
        left = hit["left"] + led_hit_extension[0]
×
227
        # Limit the position of the window so it stays inside the record.
NEW
228
        if left < default_windows[0, 0]:
×
NEW
229
            left = default_windows[0, 0]
×
NEW
230
        elif left > 150 - (led_hit_extension[1] - led_hit_extension[0]):
×
NEW
231
            left = 150 - (led_hit_extension[1] - led_hit_extension[0])
×
232

NEW
233
        right = hit["left"] + led_hit_extension[1]
×
NEW
234
        if right < default_windows[0, 1]:
×
NEW
235
            right = default_windows[0, 1]
×
NEW
236
        elif right > 150:
×
NEW
237
            right = 150
×
238

NEW
239
        windows[hit["record_i"]] = np.array([left, right])
×
NEW
240
        last = hit["record_i"]
×
241

242
    return windows
2✔
243

244

245
_on_off_dtype = np.dtype([("channel", "int16"), ("amplitude", "float32")])
2✔
246

247

248
@numba.jit(nopython=True)
2✔
249
def get_amplitude(records, led_windows, noise_window):
2✔
250
    """Needed for the SPE computation.
251

252
    Get the maximum of the signal in two different regions, one where there is no signal, and one
253
    where there is.
254

255
    :param records: Array of records
256
    :param ndarray led_windows : 2d array of shape (len(records), 2) with the window to use as the
257
        signal on area for each record. Inclusive left boundary and exclusive right boundary.
258
    :param tuple noise_window: Tuple with the window, used for the signal off area for all records.
259
    :return ndarray ons: 1d array of length len(records). The maximum amplitude in the led window
260
        area for each record.
261
    :return ndarray offs: 1d array of length len(records). The maximum amplitude in the noise area
262
        for each record.
263

264
    """
265
    ons = np.zeros(len(records), dtype=_on_off_dtype)
2✔
266
    offs = np.zeros(len(records), dtype=_on_off_dtype)
2✔
267

268
    for i, record in enumerate(records):
2✔
NEW
269
        ons[i]["channel"] = record["channel"]
×
NEW
270
        offs[i]["channel"] = record["channel"]
×
271

NEW
272
        ons[i]["amplitude"] = np.max(
×
273
            record["data"][led_windows[i, 0] : led_windows[i, 1]]
274
        )
UNCOV
275
        offs[i]["amplitude"] = np.max(record["data"][noise_window[0] : noise_window[1]])
×
276

277
    return ons, offs
2✔
278

279

280
_area_dtype = np.dtype([("channel", "int16"), ("area", "float32")])
2✔
281

282

283
@numba.jit(nopython=True)
2✔
284
def get_area(records, led_windows):
2✔
285
    """Needed for the gain computation.
286

287
    Integrate the record in the defined window area. To reduce the effects of the noise, this is
288
    done with 6 different window lengths, which are then averaged.
289

290
    :param records: Array of records
291
    :param ndarray led_windows : 2d array of shape (len(records), 2) with the window to use as the
292
        integration boundaries.
293
    :return ndarray area: 1d array of length len(records) with the averaged integrated areas for
294
        each record.
295

296
    """
297
    area = np.zeros(len(records), dtype=_area_dtype)
2✔
298
    end_pos = [2 * i for i in range(6)]
2✔
299

300
    for i, record in enumerate(records):
2✔
UNCOV
301
        area[i]["channel"] = record["channel"]
×
NEW
302
        for right in end_pos:
×
UNCOV
303
            area[i]["area"] += np.sum(
×
304
                record["data"][led_windows[i, 0] : led_windows[i, 1] + right]
305
            )
306

UNCOV
307
        area[i]["area"] /= float(len(end_pos))
×
308

309
    return area
2✔
310

311

312
@export
2✔
313
class nVetoExtTimings(strax.Plugin):
2✔
314
    """Plugin which computes the time difference `delta_time` from pulse timing of `hitlets_nv` to
315
    start time of `raw_records` which belong the `hitlets_nv`.
316

317
    They are used as the external trigger timings.
318

319
    """
320

321
    __version__ = "0.0.1"
2✔
322

323
    depends_on = ("raw_records_nv", "hitlets_nv")
2✔
324
    provides = "ext_timings_nv"
2✔
325
    data_kind = "hitlets_nv"
2✔
326

327
    compressor = "zstd"
2✔
328

329
    channel_map = straxen.URLConfig(
2✔
330
        track=False,
331
        type=immutabledict,
332
        help="immutabledict mapping subdetector to (min, max) channel number.",
333
    )
334

335
    def infer_dtype(self):
2✔
336
        dtype = []
2✔
337
        dtype += strax.time_dt_fields
2✔
338
        dtype += [
2✔
339
            (("Delta time from trigger timing [ns]", "delta_time"), np.int16),
340
            (
341
                ("Index to which pulse (not record) the hitlet belongs to.", "pulse_i"),
342
                np.int32,
343
            ),
344
        ]
345
        return dtype
2✔
346

347
    def setup(self):
2✔
348
        self.nv_pmt_start = self.channel_map["nveto"][0]
2✔
349
        self.nv_pmt_stop = self.channel_map["nveto"][1] + 1
2✔
350

351
    def compute(self, hitlets_nv, raw_records_nv):
2✔
352
        rr_nv = raw_records_nv[raw_records_nv["record_i"] == 0]
2✔
353
        pulses = np.zeros(len(rr_nv), dtype=self.pulse_dtype())
2✔
354
        pulses["time"] = rr_nv["time"]
2✔
355
        pulses["endtime"] = rr_nv["time"] + rr_nv["pulse_length"] * rr_nv["dt"]
2✔
356
        pulses["channel"] = rr_nv["channel"]
2✔
357

358
        ext_timings_nv = np.zeros_like(hitlets_nv, dtype=self.dtype)
2✔
359
        ext_timings_nv["time"] = hitlets_nv["time"]
2✔
360
        ext_timings_nv["length"] = hitlets_nv["length"]
2✔
361
        ext_timings_nv["dt"] = hitlets_nv["dt"]
2✔
362
        self.calc_delta_time(
2✔
363
            ext_timings_nv, pulses, hitlets_nv, self.nv_pmt_start, self.nv_pmt_stop
364
        )
365

366
        return ext_timings_nv
2✔
367

368
    @staticmethod
2✔
369
    def pulse_dtype():
2✔
370
        pulse_dtype = []
2✔
371
        pulse_dtype += strax.time_fields
2✔
372
        pulse_dtype += [(("PMT channel", "channel"), np.int16)]
2✔
373
        return pulse_dtype
2✔
374

375
    @staticmethod
2✔
376
    def calc_delta_time(
2✔
377
        ext_timings_nv_delta_time, pulses, hitlets_nv, nv_pmt_start, nv_pmt_stop
378
    ):
379
        """Numpy access with fancy index returns copy, not view This for-loop is required to
380
        substitute in one by one."""
381
        hitlet_index = np.arange(len(hitlets_nv))
2✔
382
        pulse_index = np.arange(len(pulses))
2✔
383
        for ch in range(nv_pmt_start, nv_pmt_stop):
2✔
384
            mask_hitlets_in_channel = hitlets_nv["channel"] == ch
2✔
385
            hitlet_in_channel_index = hitlet_index[mask_hitlets_in_channel]
2✔
386

387
            mask_pulse_in_channel = pulses["channel"] == ch
2✔
388
            pulse_in_channel_index = pulse_index[mask_pulse_in_channel]
2✔
389

390
            hitlets_in_channel = hitlets_nv[hitlet_in_channel_index]
2✔
391
            pulses_in_channel = pulses[pulse_in_channel_index]
2✔
392
            hit_in_pulse_index = strax.fully_contained_in(
2✔
393
                hitlets_in_channel, pulses_in_channel
394
            )
395
            for h_i, p_i in zip(hitlet_in_channel_index, hit_in_pulse_index):
2✔
396
                if p_i == -1:
2✔
UNCOV
397
                    continue
×
398
                res = ext_timings_nv_delta_time[h_i]
2✔
399

400
                res["delta_time"] = (
2✔
401
                    hitlets_nv[h_i]["time"]
402
                    + hitlets_nv[h_i]["time_amplitude"]
403
                    - pulses_in_channel[p_i]["time"]
404
                )
405
                res["pulse_i"] = pulse_in_channel_index[p_i]
2✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc