• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

XENONnT / straxen / 11293456467

11 Oct 2024 01:41PM UTC coverage: 89.774% (-0.06%) from 89.83%
11293456467

push

github

web-flow
Fixed default window position (#1429)

* fixed default window position

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* fixed empty input handling

* fixed empty record handling II

* changed area averaging

* fixed default window clipping

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* fixed minimum_led_position help and docstring

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* fixed whitespace at the end of help strings

* fixed area averaging help strings

* Added baseline to datatype

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Co-authored-by: Giovanni Volta <38431109+GiovanniVolta@users.noreply.github.com>
Co-authored-by: Dacheng Xu <dx2227@columbia.edu>
Co-authored-by: Yue Ma <3124558229@qq.com>

15 of 30 new or added lines in 1 file covered. (50.0%)

4 existing lines in 2 files now uncovered.

9156 of 10199 relevant lines covered (89.77%)

1.8 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.05
/straxen/plugins/led_cal/led_calibration.py
1
"""
2
Dear nT analyser,
3
if you want to complain please contact:
4
    chiara@physik.uzh.ch,
5
    gvolta@physik.uzh.ch,
6
    kazama@isee.nagoya-u.ac.jp
7
    torben.flehmke@fysik.su.se
8
"""
9

10
from immutabledict import immutabledict
2✔
11
import strax
2✔
12
import straxen
2✔
13
import numba
2✔
14
import numpy as np
2✔
15
import scipy.stats as sps
2✔
16

17
# This makes sure shorthands for only the necessary functions
18
# are made available under straxen.[...]
19
export, __all__ = strax.exporter()
2✔
20

21
channel_list = [i for i in range(494)]
2✔
22

23

24
@export
2✔
25
class LEDCalibration(strax.Plugin):
2✔
26
    """
27
    Preliminary version, several parameters to set during commissioning.
28
    LEDCalibration returns: channel, time, dt, length, Area,
29
    amplitudeLED and amplitudeNOISE.
30

31
    The new variables are:
32
        - Area: Area computed in the given window, averaged over 6
33
          windows that have the same starting sample and different end
34
          samples.
35
        - amplitudeLED: peak amplitude of the LED on run in the given
36
          window.
37
        - amplitudeNOISE: amplitude of the LED on run in a window far
38
          from the signal one.
39
    """
40

41
    __version__ = "0.3.0"
2✔
42

43
    depends_on = "raw_records"
2✔
44
    data_kind = "led_cal"
2✔
45
    compressor = "zstd"
2✔
46
    parallel = "process"
2✔
47
    rechunk_on_save = False
2✔
48

49
    led_cal_record_length = straxen.URLConfig(
2✔
50
        default=160, infer_type=False, help="Length (samples) of one record without 0 padding."
51
    )
52

53
    baseline_window = straxen.URLConfig(
2✔
54
        default=(0, 40),
55
        infer_type=False,
56
        help="Window (samples) for baseline calculation.",
57
    )
58

59
    minimum_led_position = straxen.URLConfig(
2✔
60
        default=60,
61
        infer_type=False,
62
        help=(
63
            "The minimum sample index to consider for LED hits. Hits before this sample are "
64
            "ignored."
65
        ),
66
    )
67

68
    led_hit_extension = straxen.URLConfig(
2✔
69
        default=(-8, 32),
70
        infer_type=False,
71
        help="The extension around the LED hit to integrate.",
72
    )
73

74
    area_averaging_length = straxen.URLConfig(
2✔
75
        default=7,
76
        infer_type=False,
77
        help=(
78
            "The total length of the averaging window for the area calculation. "
79
            "To mitigate a possiple bias from noise, the area is integrated multiple times with "
80
            "sligntly different window lengths and then averaged. area_averaging_length should "
81
            "be divisible by area_averaging_step."
82
        ),
83
    )
84

85
    area_averaging_step = straxen.URLConfig(
2✔
86
        default=1,
87
        infer_type=False,
88
        help=(
89
            "The step size used for the different windows, averaged for the area calculation. "
90
            "To mitigate a possiple bias from noise, the area is integrated multiple times with "
91
            "sligntly different window lengths and then averaged. area_averaging_length should "
92
            "be divisible by area_averaging_step."
93
        ),
94
    )
95

96
    noise_window = straxen.URLConfig(
2✔
97
        default=(10, 50), infer_type=False, help="Window (samples) to analyse the noise"
98
    )
99

100
    channel_list = straxen.URLConfig(
2✔
101
        default=(tuple(channel_list)),
102
        infer_type=False,
103
        help="List of PMTs. Defalt value: all the PMTs",
104
    )
105

106
    led_cal_hit_min_height_over_noise = straxen.URLConfig(
2✔
107
        default=6,
108
        infer_type=False,
109
        help=(
110
            "Minimum hit amplitude in numbers of baseline_rms above baseline. "
111
            "Actual threshold used is max(hit_min_amplitude, hit_min_"
112
            "height_over_noise * baseline_rms)."
113
        ),
114
    )
115

116
    dtype = [
2✔
117
        (("Area averaged in integration windows", "area"), np.float32),
118
        (("Amplitude in LED window", "amplitude_led"), np.float32),
119
        (("Amplitude in off LED window", "amplitude_noise"), np.float32),
120
        (("Channel", "channel"), np.int16),
121
        (("Start time of the interval (ns since unix epoch)", "time"), np.int64),
122
        (("Time resolution in ns", "dt"), np.int16),
123
        (("Length of the interval in samples", "length"), np.int32),
124
        (("Whether there was a hit found in the record", "triggered"), bool),
125
        (("Sample index of the hit that defines the window position", "hit_position"), np.uint8),
126
        (("Window used for integration", "integration_window"), np.uint8, (2,)),
127
        (("Baseline from the record", "baseline"), np.float32),
128
    ]
129

130
    def compute(self, raw_records):
2✔
131
        """The data for LED calibration are build for those PMT which belongs to channel list.
132

133
        This is used for the different ligh levels. As default value all the PMTs are considered.
134

135
        """
136
        mask = np.where(np.in1d(raw_records["channel"], self.channel_list))[0]
2✔
137
        raw_records_active_channels = raw_records[mask]
2✔
138
        records = get_records(
2✔
139
            raw_records_active_channels, self.baseline_window, self.led_cal_record_length
140
        )
141
        del raw_records_active_channels, raw_records
2✔
142

143
        temp = np.zeros(len(records), dtype=self.dtype)
2✔
144
        strax.copy_to_buffer(records, temp, "_recs_to_temp_led")
2✔
145

146
        led_windows, triggered = get_led_windows(
2✔
147
            records,
148
            self.minimum_led_position,
149
            self.led_hit_extension,
150
            self.led_cal_hit_min_height_over_noise,
151
            self.led_cal_record_length,
152
            self.area_averaging_length,
153
        )
154

155
        on, off = get_amplitude(records, led_windows, self.noise_window)
2✔
156
        temp["amplitude_led"] = on["amplitude"]
2✔
157
        temp["amplitude_noise"] = off["amplitude"]
2✔
158

159
        area = get_area(records, led_windows, self.area_averaging_length, self.area_averaging_step)
2✔
160
        temp["area"] = area["area"]
2✔
161

162
        temp["triggered"] = triggered
2✔
163
        temp["hit_position"] = led_windows[:, 0] - self.led_hit_extension[0]
2✔
164
        temp["integration_window"] = led_windows
2✔
165
        temp["baseline"] = records["baseline"]
2✔
166
        return temp
2✔
167

168

169
def get_records(raw_records, baseline_window, led_cal_record_length):
2✔
170
    """Determine baseline as the average of the first baseline_samples of each pulse.
171

172
    Subtract the pulse float(data) from baseline.
173

174
    """
175

176
    record_length_padded = np.shape(raw_records.dtype["data"])[0]
2✔
177

178
    _dtype = [
2✔
179
        (("Start time since unix epoch [ns]", "time"), "<i8"),
180
        (("Length of the interval in samples", "length"), "<i4"),
181
        (("Width of one sample [ns]", "dt"), "<i2"),
182
        (("Channel/PMT number", "channel"), "<i2"),
183
        (
184
            (
185
                "Length of pulse to which the record belongs (without zero-padding)",
186
                "pulse_length",
187
            ),
188
            "<i4",
189
        ),
190
        (("Fragment number in the pulse", "record_i"), "<i2"),
191
        (
192
            ("Baseline in ADC counts. data = int(baseline) - data_orig", "baseline"),
193
            "f4",
194
        ),
195
        (
196
            ("Baseline RMS in ADC counts. data = baseline - data_orig", "baseline_rms"),
197
            "f4",
198
        ),
199
        (("Waveform data in raw ADC counts with 0 padding", "data"), "f4", (record_length_padded,)),
200
    ]
201

202
    records = np.zeros(len(raw_records), dtype=_dtype)
2✔
203
    strax.copy_to_buffer(raw_records, records, "_rr_to_r_led")
2✔
204

205
    mask = np.where((records["record_i"] == 0) & (records["length"] == led_cal_record_length))[0]
2✔
206
    records = records[mask]
2✔
207
    bl = records["data"][:, baseline_window[0] : baseline_window[1]].mean(axis=1)
2✔
208
    rms = records["data"][:, baseline_window[0] : baseline_window[1]].std(axis=1)
2✔
209
    records["data"][:, :led_cal_record_length] = (
2✔
210
        -1.0 * (records["data"][:, :led_cal_record_length].transpose() - bl[:]).transpose()
211
    )
212
    records["baseline"] = bl
2✔
213
    records["baseline_rms"] = rms
2✔
214
    return records
2✔
215

216

217
def get_led_windows(
2✔
218
    records,
219
    minimum_led_position,
220
    led_hit_extension,
221
    hit_min_height_over_noise,
222
    record_length,
223
    area_averaging_length,
224
):
225
    """Search for hits in the records, if a hit is found, return an interval around the hit given by
226
    led_hit_extension. If no hit is found in the record, return the default window.
227

228
    :param records: Array of the records to search for LED hits.
229
    :param minimum_led_position: The minimum simple index of the LED hits. Hits before this sample
230
        are ignored.
231
    :param led_hit_extension: The integration window around the first hit found to use. A tuple of
232
        form (samples_before, samples_after) the first LED hit.
233
    :param hit_min_amplitude: Minimum amplitude of the signal to be considered a hit.
234
    :param hit_min_height_over_noise: Minimum height of the signal over noise to be considered a
235
        hit. :return (len(records), 2) array: Integration window for each record
236
    :param record_length: The length of one led_calibration record
237
    :param area_averaging_length: The length (samples) of the window to do the averaging on.
238

239
    """
240
    if len(records) == 0:  # If input is empty, return empty arrays of correct shape
2✔
241
        return np.empty((0, 2), dtype=np.int64), np.empty(0, dtype=bool)
2✔
242

243
    hits = strax.find_hits(
×
244
        records,
245
        min_amplitude=0,  # Always use the height over noise threshold.
246
        min_height_over_noise=hit_min_height_over_noise,
247
    )
248

NEW
249
    maximum_led_position = record_length - area_averaging_length - led_hit_extension[1]
×
NEW
250
    hits = hits[hits["left"] >= minimum_led_position]
×
251
    # Check if the records are sorted properly by 'record_i' first and 'time' second and sort them
252
    # if they are not
253
    record_i = hits["record_i"]
×
254
    time = hits["time"]
×
255
    if not (
×
256
        np.all(
257
            (record_i[:-1] < record_i[1:])
258
            | ((record_i[:-1] == record_i[1:]) & (time[:-1] <= time[1:]))
259
        )
260
    ):
261
        hits.sort(order=["record_i", "time"])
×
262

NEW
263
    if len(hits) == 0:  # This really should not be the case. But in case it is:
×
NEW
264
        default_hit_position = minimum_led_position
×
265
    else:
NEW
266
        default_hit_position, _ = sps.mode(hits["left"])
×
267

NEW
268
        if isinstance(default_hit_position, np.ndarray):
×
NEW
269
            default_hit_position = default_hit_position[0]
×
270

NEW
271
        if default_hit_position > maximum_led_position:
×
NEW
272
            default_hit_position = maximum_led_position
×
273

NEW
274
    triggered = np.zeros(len(records), dtype=bool)
×
275

NEW
276
    default_windows = np.tile(default_hit_position + np.array(led_hit_extension), (len(records), 1))
×
UNCOV
277
    return _get_led_windows(
×
278
        hits, default_windows, led_hit_extension, maximum_led_position, triggered
279
    )
280

281

282
@numba.jit(nopython=True)
2✔
283
def _get_led_windows(hits, default_windows, led_hit_extension, maximum_led_position, triggered):
2✔
UNCOV
284
    windows = default_windows
×
285
    last = -1
×
286

287
    for hit in hits:
×
288
        if hit["record_i"] == last:
×
289
            continue  # If there are multiple hits in one record, ignore after the first
×
290

NEW
291
        triggered[hit["record_i"]] = True
×
292

UNCOV
293
        hit_left = hit["left"]
×
294
        # Limit the position of the window so it stays inside the record.
NEW
295
        if hit_left > maximum_led_position:
×
NEW
296
            hit_left = maximum_led_position
×
297

298
        left = hit_left + led_hit_extension[0]
×
299
        right = hit_left + led_hit_extension[1]
×
300

301
        windows[hit["record_i"]] = np.array([left, right])
×
302
        last = hit["record_i"]
×
303

NEW
304
    return windows, triggered
×
305

306

307
_on_off_dtype = np.dtype([("channel", "int16"), ("amplitude", "float32")])
2✔
308

309

310
@numba.jit(nopython=True)
2✔
311
def get_amplitude(records, led_windows, noise_window):
2✔
312
    """Needed for the SPE computation.
313

314
    Get the maximum of the signal in two different regions, one where there is no signal, and one
315
    where there is.
316

317
    :param records: Array of records
318
    :param ndarray led_windows : 2d array of shape (len(records), 2) with the window to use as the
319
        signal on area for each record. Inclusive left boundary and exclusive right boundary.
320
    :param tuple noise_window: Tuple with the window, used for the signal off area for all records.
321
    :return ndarray ons: 1d array of length len(records). The maximum amplitude in the led window
322
        area for each record.
323
    :return ndarray offs: 1d array of length len(records). The maximum amplitude in the noise area
324
        for each record.
325

326
    """
327
    ons = np.zeros(len(records), dtype=_on_off_dtype)
2✔
328
    offs = np.zeros(len(records), dtype=_on_off_dtype)
2✔
329

330
    for i, record in enumerate(records):
2✔
331
        ons[i]["channel"] = record["channel"]
×
332
        offs[i]["channel"] = record["channel"]
×
333

334
        ons[i]["amplitude"] = np.max(record["data"][led_windows[i, 0] : led_windows[i, 1]])
×
335
        offs[i]["amplitude"] = np.max(record["data"][noise_window[0] : noise_window[1]])
×
336

337
    return ons, offs
2✔
338

339

340
_area_dtype = np.dtype([("channel", "int16"), ("area", "float32")])
2✔
341

342

343
@numba.jit(nopython=True)
2✔
344
def get_area(records, led_windows, area_averaging_length, area_averaging_step):
2✔
345
    """Needed for the gain computation.
346

347
    Integrate the record in the defined window area. To reduce the effects of the noise, this is
348
    done with 6 different window lengths, which are then averaged.
349

350
    :param records: Array of records
351
    :param ndarray led_windows : 2d array of shape (len(records), 2) with the window to use as the
352
        integration boundaries.
353
    :param area_averaging_length: The total length in records of the window over which to do the
354
        averaging of the areas.
355
    :param area_averaging_step: The increase in length for each step of the averaging.
356
    :return ndarray area: 1d array of length len(records) with the averaged integrated areas for
357
        each record.
358

359
    """
360
    area = np.zeros(len(records), dtype=_area_dtype)
2✔
361
    end_pos = np.arange(0, area_averaging_length + area_averaging_step, area_averaging_step)
2✔
362

363
    for i, record in enumerate(records):
2✔
364
        area[i]["channel"] = record["channel"]
×
365
        for right in end_pos:
×
366
            area[i]["area"] += np.sum(record["data"][led_windows[i, 0] : led_windows[i, 1] + right])
×
367

368
        area[i]["area"] /= float(len(end_pos))
×
369

370
    return area
2✔
371

372

373
@export
2✔
374
class nVetoExtTimings(strax.Plugin):
2✔
375
    """Plugin which computes the time difference `delta_time` from pulse timing of `hitlets_nv` to
376
    start time of `raw_records` which belong the `hitlets_nv`.
377

378
    They are used as the external trigger timings.
379

380
    """
381

382
    __version__ = "0.0.1"
2✔
383

384
    depends_on = ("raw_records_nv", "hitlets_nv")
2✔
385
    provides = "ext_timings_nv"
2✔
386
    data_kind = "hitlets_nv"
2✔
387

388
    compressor = "zstd"
2✔
389

390
    channel_map = straxen.URLConfig(
2✔
391
        track=False,
392
        type=immutabledict,
393
        help="immutabledict mapping subdetector to (min, max) channel number.",
394
    )
395

396
    def infer_dtype(self):
2✔
397
        dtype = []
2✔
398
        dtype += strax.time_dt_fields
2✔
399
        dtype += [
2✔
400
            (("Delta time from trigger timing [ns]", "delta_time"), np.int16),
401
            (
402
                ("Index to which pulse (not record) the hitlet belongs to.", "pulse_i"),
403
                np.int32,
404
            ),
405
        ]
406
        return dtype
2✔
407

408
    def setup(self):
2✔
409
        self.nv_pmt_start = self.channel_map["nveto"][0]
2✔
410
        self.nv_pmt_stop = self.channel_map["nveto"][1] + 1
2✔
411

412
    def compute(self, hitlets_nv, raw_records_nv):
2✔
413
        rr_nv = raw_records_nv[raw_records_nv["record_i"] == 0]
2✔
414
        pulses = np.zeros(len(rr_nv), dtype=self.pulse_dtype())
2✔
415
        pulses["time"] = rr_nv["time"]
2✔
416
        pulses["endtime"] = rr_nv["time"] + rr_nv["pulse_length"] * rr_nv["dt"]
2✔
417
        pulses["channel"] = rr_nv["channel"]
2✔
418

419
        ext_timings_nv = np.zeros_like(hitlets_nv, dtype=self.dtype)
2✔
420
        ext_timings_nv["time"] = hitlets_nv["time"]
2✔
421
        ext_timings_nv["length"] = hitlets_nv["length"]
2✔
422
        ext_timings_nv["dt"] = hitlets_nv["dt"]
2✔
423
        self.calc_delta_time(
2✔
424
            ext_timings_nv, pulses, hitlets_nv, self.nv_pmt_start, self.nv_pmt_stop
425
        )
426

427
        return ext_timings_nv
2✔
428

429
    @staticmethod
2✔
430
    def pulse_dtype():
2✔
431
        pulse_dtype = []
2✔
432
        pulse_dtype += strax.time_fields
2✔
433
        pulse_dtype += [(("PMT channel", "channel"), np.int16)]
2✔
434
        return pulse_dtype
2✔
435

436
    @staticmethod
2✔
437
    def calc_delta_time(ext_timings_nv_delta_time, pulses, hitlets_nv, nv_pmt_start, nv_pmt_stop):
2✔
438
        """Numpy access with fancy index returns copy, not view This for-loop is required to
439
        substitute in one by one."""
440
        hitlet_index = np.arange(len(hitlets_nv))
2✔
441
        pulse_index = np.arange(len(pulses))
2✔
442
        for ch in range(nv_pmt_start, nv_pmt_stop):
2✔
443
            mask_hitlets_in_channel = hitlets_nv["channel"] == ch
2✔
444
            hitlet_in_channel_index = hitlet_index[mask_hitlets_in_channel]
2✔
445

446
            mask_pulse_in_channel = pulses["channel"] == ch
2✔
447
            pulse_in_channel_index = pulse_index[mask_pulse_in_channel]
2✔
448

449
            hitlets_in_channel = hitlets_nv[hitlet_in_channel_index]
2✔
450
            pulses_in_channel = pulses[pulse_in_channel_index]
2✔
451
            hit_in_pulse_index = strax.fully_contained_in(hitlets_in_channel, pulses_in_channel)
2✔
452
            for h_i, p_i in zip(hitlet_in_channel_index, hit_in_pulse_index):
2✔
453
                if p_i == -1:
2✔
454
                    continue
×
455
                res = ext_timings_nv_delta_time[h_i]
2✔
456

457
                res["delta_time"] = (
2✔
458
                    hitlets_nv[h_i]["time"]
459
                    + hitlets_nv[h_i]["time_amplitude"]
460
                    - pulses_in_channel[p_i]["time"]
461
                )
462
                res["pulse_i"] = pulse_in_channel_index[p_i]
2✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc