• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

XENONnT / straxen / 10572678510

27 Aug 2024 06:11AM UTC coverage: 90.824% (+0.02%) from 90.806%
10572678510

Pull #1415

github

web-flow
Merge daf393010 into c61848018
Pull Request #1415: Remove configuration `sum_waveform_top_array` from `MergedS2s`

9156 of 10081 relevant lines covered (90.82%)

1.82 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.71
/straxen/plugins/led_cal/led_calibration.py
1
"""
2
Dear nT analyser,
3
if you want to complain please contact:
4
    chiara@physik.uzh.ch,
5
    gvolta@physik.uzh.ch,
6
    kazama@isee.nagoya-u.ac.jp
7
    torben.flehmke@fysik.su.se
8
"""
9

10
from immutabledict import immutabledict
2✔
11
import strax
2✔
12
import straxen
2✔
13
import numba
2✔
14
import numpy as np
2✔
15

16
# This makes sure shorthands for only the necessary functions
17
# are made available under straxen.[...]
18
export, __all__ = strax.exporter()
2✔
19

20
channel_list = [i for i in range(494)]
2✔
21

22

23
@export
2✔
24
class LEDCalibration(strax.Plugin):
2✔
25
    """
26
    Preliminary version, several parameters to set during commissioning.
27
    LEDCalibration returns: channel, time, dt, length, Area,
28
    amplitudeLED and amplitudeNOISE.
29

30
    The new variables are:
31
        - Area: Area computed in the given window, averaged over 6
32
          windows that have the same starting sample and different end
33
          samples.
34
        - amplitudeLED: peak amplitude of the LED on run in the given
35
          window.
36
        - amplitudeNOISE: amplitude of the LED on run in a window far
37
          from the signal one.
38
    """
39

40
    __version__ = "0.3.0"
2✔
41

42
    depends_on = "raw_records"
2✔
43
    data_kind = "led_cal"
2✔
44
    compressor = "zstd"
2✔
45
    parallel = "process"
2✔
46
    rechunk_on_save = False
2✔
47

48
    LED_cal_record_length = straxen.URLConfig(
2✔
49
        default=160, infer_type=False, help="Length (samples) of one record without 0 padding."
50
    )
51

52
    baseline_window = straxen.URLConfig(
2✔
53
        default=(0, 40),
54
        infer_type=False,
55
        help="Window (samples) for baseline calculation.",
56
    )
57

58
    default_led_window = straxen.URLConfig(
2✔
59
        default=(78, 107),
60
        infer_type=False,
61
        help="Default window (samples) to integrate and get the maximum amplitude \
62
            if no hit was found in the record.",
63
    )
64

65
    led_hit_extension = straxen.URLConfig(
2✔
66
        default=(-5, 24),
67
        infer_type=False,
68
        help="The extension around the LED hit to integrate.",
69
    )
70

71
    area_averaging_length = straxen.URLConfig(
2✔
72
        default=10,
73
        infer_type=False,
74
        help=(
75
            "The total length of the averaging window for the area calculation."
76
            "To mitigate a possiple bias from noise, the area is integrated multiple times with"
77
            "sligntly different window lengths and then averaged. integration_averaging_length"
78
            "should be divisible by step."
79
        ),
80
    )
81

82
    area_averaging_step = straxen.URLConfig(
2✔
83
        default=2,
84
        infer_type=False,
85
        help=(
86
            "The step size used for the different windows, averaged for the area calculation."
87
            "To mitigate a possiple bias from noise, the area is integrated multiple times with"
88
            "sligntly different window lengths and then averaged. integration_averaging_length"
89
            "should be divisible by step."
90
        ),
91
    )
92

93
    noise_window = straxen.URLConfig(
2✔
94
        default=(10, 48), infer_type=False, help="Window (samples) to analyse the noise"
95
    )
96

97
    channel_list = straxen.URLConfig(
2✔
98
        default=(tuple(channel_list)),
99
        infer_type=False,
100
        help="List of PMTs. Defalt value: all the PMTs",
101
    )
102

103
    hit_min_height_over_noise = straxen.URLConfig(
2✔
104
        default=4,
105
        infer_type=False,
106
        help=(
107
            "Minimum hit amplitude in numbers of baseline_rms above baseline."
108
            "Actual threshold used is max(hit_min_amplitude, hit_min_"
109
            "height_over_noise * baseline_rms)."
110
        ),
111
    )
112

113
    dtype = [
2✔
114
        ("area", np.float32, "Area averaged in integration windows"),
115
        ("amplitude_led", np.float32, "Amplitude in LED window"),
116
        ("amplitude_noise", np.float32, "Amplitude in off LED window"),
117
        ("channel", np.int16, "Channel"),
118
        ("time", np.int64, "Start time of the interval (ns since unix epoch)"),
119
        ("dt", np.int16, "Time resolution in ns"),
120
        ("length", np.int32, "Length of the interval in samples"),
121
    ]
122

123
    def compute(self, raw_records):
2✔
124
        """The data for LED calibration are build for those PMT which belongs to channel list.
125

126
        This is used for the different ligh levels. As default value all the PMTs are considered.
127

128
        """
129
        mask = np.where(np.in1d(raw_records["channel"], self.channel_list))[0]
2✔
130
        raw_records_active_channels = raw_records[mask]
2✔
131
        records = get_records(
2✔
132
            raw_records_active_channels, self.baseline_window, self.LED_cal_record_length
133
        )
134
        del raw_records_active_channels, raw_records
2✔
135

136
        temp = np.zeros(len(records), dtype=self.dtype)
2✔
137
        strax.copy_to_buffer(records, temp, "_recs_to_temp_led")
2✔
138

139
        led_windows = get_led_windows(
2✔
140
            records,
141
            self.default_led_window,
142
            self.led_hit_extension,
143
            self.hit_min_height_over_noise,
144
            self.LED_cal_record_length,
145
            self.area_averaging_length,
146
        )
147

148
        on, off = get_amplitude(records, led_windows, self.noise_window)
2✔
149
        temp["amplitude_led"] = on["amplitude"]
2✔
150
        temp["amplitude_noise"] = off["amplitude"]
2✔
151

152
        area = get_area(records, led_windows, self.area_averaging_length, self.area_averaging_step)
2✔
153
        temp["area"] = area["area"]
2✔
154
        return temp
2✔
155

156

157
def get_records(raw_records, baseline_window, LED_cal_record_length):
2✔
158
    """Determine baseline as the average of the first baseline_samples of each pulse.
159

160
    Subtract the pulse float(data) from baseline.
161

162
    """
163

164
    record_length_padded = np.shape(raw_records.dtype["data"])[0]
2✔
165

166
    _dtype = [
2✔
167
        (("Start time since unix epoch [ns]", "time"), "<i8"),
168
        (("Length of the interval in samples", "length"), "<i4"),
169
        (("Width of one sample [ns]", "dt"), "<i2"),
170
        (("Channel/PMT number", "channel"), "<i2"),
171
        (
172
            (
173
                "Length of pulse to which the record belongs (without zero-padding)",
174
                "pulse_length",
175
            ),
176
            "<i4",
177
        ),
178
        (("Fragment number in the pulse", "record_i"), "<i2"),
179
        (
180
            ("Baseline in ADC counts. data = int(baseline) - data_orig", "baseline"),
181
            "f4",
182
        ),
183
        (
184
            ("Baseline RMS in ADC counts. data = baseline - data_orig", "baseline_rms"),
185
            "f4",
186
        ),
187
        (("Waveform data in raw ADC counts with 0 padding", "data"), "f4", (record_length_padded,)),
188
    ]
189

190
    records = np.zeros(len(raw_records), dtype=_dtype)
2✔
191
    strax.copy_to_buffer(raw_records, records, "_rr_to_r_led")
2✔
192

193
    mask = np.where((records["record_i"] == 0) & (records["length"] == LED_cal_record_length))[0]
2✔
194
    records = records[mask]
2✔
195
    bl = records["data"][:, baseline_window[0] : baseline_window[1]].mean(axis=1)
2✔
196
    rms = records["data"][:, baseline_window[0] : baseline_window[1]].std(axis=1)
2✔
197
    records["data"][:, :LED_cal_record_length] = (
2✔
198
        -1.0 * (records["data"][:, :LED_cal_record_length].transpose() - bl[:]).transpose()
199
    )
200
    records["baseline"] = bl
2✔
201
    records["baseline_rms"] = rms
2✔
202
    return records
2✔
203

204

205
def get_led_windows(
2✔
206
    records,
207
    default_window,
208
    led_hit_extension,
209
    hit_min_height_over_noise,
210
    record_length,
211
    area_averaging_length,
212
):
213
    """Search for hits in the records, if a hit is found, return an interval around the hit given by
214
    led_hit_extension. If no hit is found in the record, return the default window.
215

216
    :param records: Array of the records to search for LED hits.
217
    :param default_window: Default window to use if no LED hit is found given as a tuple (start,
218
        end)
219
    :param led_hit_extension: The integration window around the first hit found to use. A tuple of
220
        form (samples_before, samples_after) the first LED hit.
221
    :param hit_min_amplitude: Minimum amplitude of the signal to be considered a hit.
222
    :param hit_min_height_over_noise: Minimum height of the signal over noise to be considered a
223
        hit. :return (len(records), 2) array: Integration window for each record
224
    :param record_length: The length of one led_calibration record
225
    :param area_averaging_length: The length (samples) of the window to do the averaging on.
226

227
    """
228
    if len(records) == 0:  # If input is empty
2✔
229
        return np.empty((0, 2), dtype=np.int64)  # Return empty array of correct shape
2✔
230

231
    hits = strax.find_hits(
×
232
        records,
233
        min_amplitude=0,  # Always use the height over noise threshold.
234
        min_height_over_noise=hit_min_height_over_noise,
235
    )
236
    # Check if the records are sorted properly by 'record_i' first and 'time' second and if them if
237
    # they are not
238
    record_i = hits["record_i"]
×
239
    time = hits["time"]
×
240
    if not (
×
241
        np.all(
242
            (record_i[:-1] < record_i[1:])
243
            | ((record_i[:-1] == record_i[1:]) & (time[:-1] <= time[1:]))
244
        )
245
    ):
246
        hits.sort(order=["record_i", "time"])
×
247

248
    default_windows = np.tile(default_window, (len(records), 1))
×
249
    return _get_led_windows(
×
250
        hits, default_windows, led_hit_extension, record_length, area_averaging_length
251
    )
252

253

254
@numba.jit(nopython=True)
2✔
255
def _get_led_windows(
2✔
256
    hits, default_windows, led_hit_extension, record_length, area_averaging_length
257
):
258
    windows = default_windows
×
259
    hit_left_min = default_windows[0, 0] - led_hit_extension[0]
×
260
    hit_left_max = record_length - area_averaging_length - led_hit_extension[1]
×
261
    last = -1
×
262

263
    for hit in hits:
×
264
        if hit["record_i"] == last:
×
265
            continue  # If there are multiple hits in one record, ignore after the first
×
266

267
        hit_left = hit["left"]
×
268
        # Limit the position of the window so it stays inside the record.
269
        if hit_left < hit_left_min:
×
270
            hit_left = hit_left_min
×
271
        if hit_left > hit_left_max:
×
272
            hit_left = hit_left_max
×
273

274
        left = hit_left + led_hit_extension[0]
×
275
        right = hit_left + led_hit_extension[1]
×
276

277
        windows[hit["record_i"]] = np.array([left, right])
×
278
        last = hit["record_i"]
×
279

280
    return windows
×
281

282

283
_on_off_dtype = np.dtype([("channel", "int16"), ("amplitude", "float32")])
2✔
284

285

286
@numba.jit(nopython=True)
2✔
287
def get_amplitude(records, led_windows, noise_window):
2✔
288
    """Needed for the SPE computation.
289

290
    Get the maximum of the signal in two different regions, one where there is no signal, and one
291
    where there is.
292

293
    :param records: Array of records
294
    :param ndarray led_windows : 2d array of shape (len(records), 2) with the window to use as the
295
        signal on area for each record. Inclusive left boundary and exclusive right boundary.
296
    :param tuple noise_window: Tuple with the window, used for the signal off area for all records.
297
    :return ndarray ons: 1d array of length len(records). The maximum amplitude in the led window
298
        area for each record.
299
    :return ndarray offs: 1d array of length len(records). The maximum amplitude in the noise area
300
        for each record.
301

302
    """
303
    ons = np.zeros(len(records), dtype=_on_off_dtype)
2✔
304
    offs = np.zeros(len(records), dtype=_on_off_dtype)
2✔
305

306
    for i, record in enumerate(records):
2✔
307
        ons[i]["channel"] = record["channel"]
×
308
        offs[i]["channel"] = record["channel"]
×
309

310
        ons[i]["amplitude"] = np.max(record["data"][led_windows[i, 0] : led_windows[i, 1]])
×
311
        offs[i]["amplitude"] = np.max(record["data"][noise_window[0] : noise_window[1]])
×
312

313
    return ons, offs
2✔
314

315

316
_area_dtype = np.dtype([("channel", "int16"), ("area", "float32")])
2✔
317

318

319
@numba.jit(nopython=True)
2✔
320
def get_area(records, led_windows, area_averaging_length, area_averaging_step):
2✔
321
    """Needed for the gain computation.
322

323
    Integrate the record in the defined window area. To reduce the effects of the noise, this is
324
    done with 6 different window lengths, which are then averaged.
325

326
    :param records: Array of records
327
    :param ndarray led_windows : 2d array of shape (len(records), 2) with the window to use as the
328
        integration boundaries.
329
    :param area_averaging_length: The total length in records of the window over which to do the
330
        averaging of the areas.
331
    :param area_averaging_step: The increase in length for each step of the averaging.
332
    :return ndarray area: 1d array of length len(records) with the averaged integrated areas for
333
        each record.
334

335
    """
336
    area = np.zeros(len(records), dtype=_area_dtype)
2✔
337
    end_pos = np.arange(0, area_averaging_length + area_averaging_step, area_averaging_step)
2✔
338

339
    for i, record in enumerate(records):
2✔
340
        area[i]["channel"] = record["channel"]
×
341
        for right in end_pos:
×
342
            area[i]["area"] += np.sum(record["data"][led_windows[i, 0] : led_windows[i, 1] + right])
×
343

344
        area[i]["area"] /= float(len(end_pos))
×
345

346
    return area
2✔
347

348

349
@export
2✔
350
class nVetoExtTimings(strax.Plugin):
2✔
351
    """Plugin which computes the time difference `delta_time` from pulse timing of `hitlets_nv` to
352
    start time of `raw_records` which belong the `hitlets_nv`.
353

354
    They are used as the external trigger timings.
355

356
    """
357

358
    __version__ = "0.0.1"
2✔
359

360
    depends_on = ("raw_records_nv", "hitlets_nv")
2✔
361
    provides = "ext_timings_nv"
2✔
362
    data_kind = "hitlets_nv"
2✔
363

364
    compressor = "zstd"
2✔
365

366
    channel_map = straxen.URLConfig(
2✔
367
        track=False,
368
        type=immutabledict,
369
        help="immutabledict mapping subdetector to (min, max) channel number.",
370
    )
371

372
    def infer_dtype(self):
2✔
373
        dtype = []
2✔
374
        dtype += strax.time_dt_fields
2✔
375
        dtype += [
2✔
376
            (("Delta time from trigger timing [ns]", "delta_time"), np.int16),
377
            (
378
                ("Index to which pulse (not record) the hitlet belongs to.", "pulse_i"),
379
                np.int32,
380
            ),
381
        ]
382
        return dtype
2✔
383

384
    def setup(self):
2✔
385
        self.nv_pmt_start = self.channel_map["nveto"][0]
2✔
386
        self.nv_pmt_stop = self.channel_map["nveto"][1] + 1
2✔
387

388
    def compute(self, hitlets_nv, raw_records_nv):
2✔
389
        rr_nv = raw_records_nv[raw_records_nv["record_i"] == 0]
2✔
390
        pulses = np.zeros(len(rr_nv), dtype=self.pulse_dtype())
2✔
391
        pulses["time"] = rr_nv["time"]
2✔
392
        pulses["endtime"] = rr_nv["time"] + rr_nv["pulse_length"] * rr_nv["dt"]
2✔
393
        pulses["channel"] = rr_nv["channel"]
2✔
394

395
        ext_timings_nv = np.zeros_like(hitlets_nv, dtype=self.dtype)
2✔
396
        ext_timings_nv["time"] = hitlets_nv["time"]
2✔
397
        ext_timings_nv["length"] = hitlets_nv["length"]
2✔
398
        ext_timings_nv["dt"] = hitlets_nv["dt"]
2✔
399
        self.calc_delta_time(
2✔
400
            ext_timings_nv, pulses, hitlets_nv, self.nv_pmt_start, self.nv_pmt_stop
401
        )
402

403
        return ext_timings_nv
2✔
404

405
    @staticmethod
2✔
406
    def pulse_dtype():
2✔
407
        pulse_dtype = []
2✔
408
        pulse_dtype += strax.time_fields
2✔
409
        pulse_dtype += [(("PMT channel", "channel"), np.int16)]
2✔
410
        return pulse_dtype
2✔
411

412
    @staticmethod
2✔
413
    def calc_delta_time(ext_timings_nv_delta_time, pulses, hitlets_nv, nv_pmt_start, nv_pmt_stop):
2✔
414
        """Numpy access with fancy index returns copy, not view This for-loop is required to
415
        substitute in one by one."""
416
        hitlet_index = np.arange(len(hitlets_nv))
2✔
417
        pulse_index = np.arange(len(pulses))
2✔
418
        for ch in range(nv_pmt_start, nv_pmt_stop):
2✔
419
            mask_hitlets_in_channel = hitlets_nv["channel"] == ch
2✔
420
            hitlet_in_channel_index = hitlet_index[mask_hitlets_in_channel]
2✔
421

422
            mask_pulse_in_channel = pulses["channel"] == ch
2✔
423
            pulse_in_channel_index = pulse_index[mask_pulse_in_channel]
2✔
424

425
            hitlets_in_channel = hitlets_nv[hitlet_in_channel_index]
2✔
426
            pulses_in_channel = pulses[pulse_in_channel_index]
2✔
427
            hit_in_pulse_index = strax.fully_contained_in(hitlets_in_channel, pulses_in_channel)
2✔
428
            for h_i, p_i in zip(hitlet_in_channel_index, hit_in_pulse_index):
2✔
429
                if p_i == -1:
2✔
430
                    continue
×
431
                res = ext_timings_nv_delta_time[h_i]
2✔
432

433
                res["delta_time"] = (
2✔
434
                    hitlets_nv[h_i]["time"]
435
                    + hitlets_nv[h_i]["time_amplitude"]
436
                    - pulses_in_channel[p_i]["time"]
437
                )
438
                res["pulse_i"] = pulse_in_channel_index[p_i]
2✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc