• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

simonsobs / so3g / 13146692549

04 Feb 2025 11:17PM UTC coverage: 55.602% (-0.05%) from 55.653%
13146692549

Pull #198

github

web-flow
Merge 74945a13e into 6eacab786
Pull Request #198: Work on support for numpy-2 and python-3.13

1335 of 2401 relevant lines covered (55.6%)

0.56 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/python/smurf/smurf_archive.py
1
import sqlalchemy as db
×
2
from sqlalchemy.exc import IntegrityError
×
3
from sqlalchemy.ext.declarative import declarative_base
×
4
from sqlalchemy.orm import sessionmaker, relationship, backref
×
5

6
from spt3g import core
×
7
import so3g
×
8
import datetime as dt
×
9
import os
×
10
from tqdm import tqdm
×
11
import numpy as np
×
12
import yaml
×
13
import ast
×
14
from collections import namedtuple
×
15
from enum import Enum
×
16

17

18
Base = declarative_base()
×
19
Session = sessionmaker()
×
20
num_bias_lines = 16
×
21

22
class Files(Base):
×
23
    """Table to store file indexing info"""
24
    __tablename__ = 'files'
×
25
    id = db.Column(db.Integer, primary_key=True)
×
26

27
    path = db.Column(db.String, nullable=False)
×
28
    start = db.Column(db.DateTime)
×
29
    stop = db.Column(db.DateTime)
×
30
    frames = db.Column(db.Integer)
×
31
    channels = db.Column(db.Integer)
×
32

33
type_key = ['Observation', 'Wiring', 'Scan']
×
34
class FrameType(Base):
×
35
    """Enum table for storing frame types"""
36
    __tablename__ = "frame_type"
×
37
    id = db.Column(db.Integer, primary_key=True)
×
38
    type_name = db.Column(db.String, unique=True, nullable=True)
×
39

40
class Frame(Base):
×
41
    """Table to store frame indexing info"""
42
    __tablename__ = 'frames'
×
43
    __table_args__ = (
×
44
        db.UniqueConstraint('file_id', 'frame_idx', name='_frame_loc'),
45
    )
46

47
    id = db.Column(db.Integer, primary_key=True)
×
48

49
    file_id = db.Column(db.Integer, db.ForeignKey('files.id'))
×
50
    file = relationship("Files")
×
51

52
    frame_idx = db.Column(db.Integer, nullable=False)
×
53
    offset = db.Column(db.Integer, nullable=False)
×
54

55
    type_name = db.Column(db.String, db.ForeignKey('frame_type.type_name'))
×
56
    frame_type = relationship('FrameType')
×
57

58
    time = db.Column(db.DateTime, nullable=False)
×
59

60
    # Specific to data frames
61
    samples = db.Column(db.Integer)
×
62
    channels = db.Column(db.Integer)
×
63
    start = db.Column(db.DateTime)
×
64
    stop = db.Column(db.DateTime)
×
65

66
    def __repr__(self):
×
67
        return f"Frame({self.type_name})<{self.location}>"
×
68

69

70
class TimingParadigm(Enum):
×
71
    G3Timestream = 1
×
72
    SmurfUnixTime = 2
×
73
    TimingSystem = 3
×
74
    Mixed = 4
×
75

76
def get_sample_timestamps(frame):
×
77
    """
78
    Gets timestamps of samples in a G3Frame. This will try to get highest
79
    precision first and move to lower precision methods if that fails.
80

81
    Args
82
    ------
83
        frame (core.G3Frame):
84
            A G3Frame(Scan) containing streamed detector data.
85

86
    Returns
87
    ---------
88
        times (np.ndarray):
89
            numpy array containing timestamps in seconds
90
        paradigm (TimingParadigm):
91
            Paradigm used to calculate timestamps.
92
    """
93
    if 'primary' in frame.keys():
×
94
        if False:
×
95
            # Do high precision timing calculation here when we have real data
96
            pass
97
        else:
98
            # Try to calculate the timestamp based on the SmurfProcessor's
99
            # "UnixTime" and the G3Timestream start time.  "UnixTime" is a
100
            # 32-bit nanosecond clock that steadily increases mod 2**32.
101
            unix_times = np.array(frame['primary']['UnixTime'])
×
102
            for i in np.where(np.diff(unix_times) < 0)[0]:
×
103
                # This corrects for any wrap around
104
                unix_times[i+1:] += 2**32
×
105
            times = frame['data'].start.time / core.G3Units.s \
×
106
                + (unix_times - unix_times[0]) / 1e9
107

108
            return times, TimingParadigm.SmurfUnixTime
×
109
    else:
110
        # Calculate timestamp based on G3Timestream.times(). Note that this
111
        # only uses the timestream start and end time, and assumes samples are
112
        # equispaced.
113
        times = np.array([t.time / core.G3Units.s
×
114
                          for t in frame['data'].times()])
115
        return times, TimingParadigm.G3Timestream
×
116

117
class SmurfArchive:
×
118
    def __init__(self, archive_path, db_path=None, echo=False):
×
119
        """
120
        Class to manage a smurf data archive.
121

122
        Args
123
        -----
124
            archive_path (path):
125
                Path to the data directory
126
            db_path (path, optional):
127
                Path to the sqlite file. Defaults to ``<archive_path>/frames.db``
128
            echo (bool, optional):
129
                If true, all sql statements will print to stdout.
130
        """
131
        if db_path is None:
×
132
            db_path = os.path.join(archive_path, 'frames.db')
×
133
        self.archive_path = archive_path
×
134
        self.engine = db.create_engine(f"sqlite:///{db_path}", echo=echo)
×
135
        Session.configure(bind=self.engine)
×
136
        self.Session = sessionmaker(bind=self.engine)
×
137
        Base.metadata.create_all(self.engine)
×
138

139
        # Defines frame_types
140
        self._create_frame_types()
×
141

142
    def _create_frame_types(self):
×
143
        session = self.Session()
×
144
        if not session.query(FrameType).all():
×
145
            print("Creating FrameType table...")
×
146
            for k in type_key:
×
147
                ft = FrameType(type_name=k)
×
148
                session.add(ft)
×
149
            session.commit()
×
150

151
    def add_file(self, path, session):
×
152
        """
153
        Indexes a single file and adds it to the sqlite database.
154

155
        Args
156
        ----
157
            path (path): Path of the file to index
158
        """
159

160
        frame_types = {
×
161
            ft.type_name: ft for ft in session.query(FrameType).all()
162
        }
163

164
        db_file = Files(path=path)
×
165
        session.add(db_file)
×
166

167
        reader = so3g.G3Reader(path)
×
168

169
        total_channels = 0
×
170
        file_start, file_stop = None, None
×
171
        frame_idx = 0
×
172
        while True:
×
173
            db_frame = Frame(frame_idx=frame_idx, file=db_file)
×
174
            db_frame.offset = reader.tell()
×
175

176
            frames = reader.Process(None)
×
177
            if not frames:
×
178
                break
×
179
            frame = frames[0]
×
180
            frame_idx += 1
×
181

182
            if str(frame.type) not in type_key:
×
183
                continue
×
184

185
            db_frame.frame_type = frame_types[str(frame.type)]
×
186

187
            timestamp = frame['time'].time / core.G3Units.s
×
188
            db_frame.time = dt.datetime.fromtimestamp(timestamp)
×
189

190
            data = frame.get('data')
×
191
            if data is not None:
×
192
                db_frame.samples = data.n_samples
×
193
                db_frame.channels = len(data)
×
194
                db_frame.start = dt.datetime.fromtimestamp(data.start.time / core.G3Units.s)
×
195
                db_frame.stop = dt.datetime.fromtimestamp(data.stop.time / core.G3Units.s)
×
196

197
                if file_start is None:
×
198
                    file_start = db_frame.start
×
199
                file_stop = db_frame.stop
×
200
                total_channels = max(total_channels, db_frame.channels)
×
201

202
            session.add(db_frame)
×
203

204
        db_file.start = file_start
×
205
        db_file.stop = file_stop
×
206
        db_file.channels = total_channels
×
207
        db_file.frames = frame_idx
×
208

209

210
    def index_archive(self, verbose=False, stop_at_error=False):
×
211
        """
212
        Adds all files from an archive to the sqlite database.
213

214
        Args
215
        ----
216
        verbose: bool
217
            Verbose mode
218
        stop_at_error: bool
219
            If True, will stop if there is an error indexing a file.
220
        """
221
        session = self.Session()
×
222
        indexed_files = [f[0] for f in session.query(Files.path).all()]
×
223

224
        files = []
×
225
        for root, _, fs in os.walk(self.archive_path):
×
226
            for f in fs:
×
227
                path = os.path.join(root, f)
×
228
                if path.endswith('.g3') and path not in indexed_files:
×
229
                    files.append(path)
×
230

231
        if verbose:
×
232
            print(f"Indexing {len(files)} files...")
×
233

234
        for f in tqdm(files):
×
235
            try:
×
236
                self.add_file(os.path.join(root, f), session)
×
237
                session.commit()
×
238
            except IntegrityError as e:
×
239
                # Database Integrity Errors, such as duplicate entries
240
                session.rollback()
×
241
                print(e)
×
242
            except RuntimeError as e:
×
243
                # End of stream errors, for G3Files that were not fully flushed
244
                session.rollback()
×
245
                print(f"Failed on file {f} due to end of stream error!")
×
246
            except Exception as e:
×
247
                # This will catch generic errors such as attempting to load
248
                # out-of-date files that do not have the required frame
249
                # structure specified in the TOD2MAPS docs.
250
                session.rollback()
×
251
                if stop_at_error:
×
252
                    raise e
×
253
                elif verbose:
×
254
                    print(f"Failed on file {f}:\n{e}")
×
255

256
        session.close()
×
257

258
    def load_data(self, start, end, show_pb=True, load_biases=True):
×
259
        """
260
        Loads smurf G3 data for a given time range. For the specified time range
261
        this will return a chunk of data that includes that time range.
262

263
        Args
264
        -----
265
            start (timestamp): start timestamp
266
            end   (timestamp): end timestamp
267
            show_pb (bool, optional): If True, will show progress bar.
268
            load_biases (bool, optional): If True, will return biases.
269

270
        Returns
271
        --------
272
            Returns a tuple ``SmurfData(times, data, primary, status, biases, timing_paradigm)``
273
            with the following fields:
274

275
                times (np.ndarray[samples]):
276
                    Array of unix timestamps for loaded data
277
                data (np.ndarray[channels, samples]):
278
                    Array of the squid phase in units of radians for each
279
                    channel with data in the specified time range. The index of
280
                    the array is the readout channel number.
281
                primary (Dict[np.ndarray]):
282
                    Dict of numpy arrays holding the "primary" data included in
283
                    the packet headers, including 'AveragingResetBits',
284
                    'Counter0', 'Counter1', 'Counter2', 'FluxRampIncrement',
285
                    'FluxRampOffset', 'FrameCounter', 'TESRelaySetting',
286
                    'UnixTime'
287
                status (SmurfStatus):
288
                    SmurfStatus object containing metadata info at the time of
289
                    the first Scan frame in the requested interval. If there
290
                    are no Scan frames in the interval, this will be None.
291
                biases (optional, np.ndarray[NTES, samples]):
292
                    An array containing the TES bias values.
293
                    If ``load_biases`` is False, this will be None.
294
                timing_paradigm(TimingParadigm):
295
                    Tells you the method used to extract timestamps from the
296
                    frame data.
297
        """
298
        session = self.Session()
×
299

300
        frames = session.query(Frame).filter(
×
301
            Frame.type_name == 'Scan',
302
            Frame.stop >= dt.datetime.fromtimestamp(start),
303
            Frame.start < dt.datetime.fromtimestamp(end)
304
        ).order_by(Frame.time)
305
        session.close()
×
306

307
        samples, channels = 0, 0
×
308
        num_frames = 0
×
309
        for f in frames:
×
310
            num_frames += 1
×
311
            samples += f.samples
×
312
            channels = max(f.channels, channels)
×
313

314
        timestamps = np.full((samples,), np.nan, dtype=np.float64)
×
315
        data = np.full((channels, samples), 0, dtype=np.int32)
×
316
        if load_biases:
×
317
            biases = np.full((num_bias_lines, samples), 0, dtype=np.int32)
×
318
        else:
319
            biases = None
×
320

321
        primary = {}
×
322

323
        cur_sample = 0
×
324
        cur_file = None
×
325
        timing_paradigm = None
×
326
        for frame_info in tqdm(frames, total=num_frames, disable=(not show_pb)):
×
327
            file = frame_info.file.path
×
328
            if file != cur_file:
×
329
                reader = so3g.G3Reader(file)
×
330
                cur_file = file
×
331

332
            reader.seek(frame_info.offset)
×
333
            frame = reader.Process(None)[0]
×
334
            nsamp = frame['data'].n_samples
×
335

336
            key_order = [int(k[1:]) for k in frame['data'].keys()]
×
337
            data[key_order, cur_sample:cur_sample + nsamp] = frame['data']
×
338

339
            if load_biases:
×
340
                bias_order = [int(k[-2:]) for k in frame['tes_biases'].keys()]
×
341
                biases[bias_order, cur_sample:cur_sample + nsamp] = frame['tes_biases']
×
342

343
            # Loads primary data
344
            if 'primary' in frame.keys():
×
345
                for k, v in frame['primary'].items():
×
346
                    if k not in primary:
×
347
                        primary[k] = np.zeros(samples, dtype=np.int64)
×
348
                    primary[k][cur_sample:cur_sample + nsamp] = v
×
349

350
            ts, paradigm = get_sample_timestamps(frame)
×
351
            if timing_paradigm is None:
×
352
                timing_paradigm = paradigm
×
353
            elif timing_paradigm != paradigm:
×
354
                timing_paradigm = TimingParadigm.Mixed
×
355

356
            timestamps[cur_sample:cur_sample + nsamp] = ts
×
357

358
            cur_sample += nsamp
×
359

360
        # Conversion from DAC counts to squid phase
361
        rad_per_count = np.pi / 2**15
×
362
        data = data * rad_per_count
×
363

364
        if len(timestamps) > 0:
×
365
            status = self.load_status(timestamps[0])
×
366
        else:
367
            status = None
×
368

369
        SmurfData = namedtuple('SmurfData', 'times data primary status biases timing_paradigm')
×
370
        if load_biases:
×
371
            return SmurfData(timestamps, data, primary, status, biases, timing_paradigm)
×
372
        else:
373
            return SmurfData(timestamps, data, primary, status, None, timing_paradigm)
×
374

375
    def load_status(self, time, show_pb=False):
×
376
        """
377
        Returns the status dict at specified unix timestamp.
378
        Loads all status frames between session start frame and specified time.
379

380
        Args:
381
            time (timestamp): Time at which you want the rogue status
382

383
        Returns:
384
            status (dict): Dictionary of rogue variables at specified time.
385
        """
386
        session = self.Session()
×
387
        session_start,  = session.query(Frame.time).filter(
×
388
            Frame.type_name == 'Observation',
389
            Frame.time <= dt.datetime.fromtimestamp(time)
390
        ).order_by(Frame.time.desc()).first()
391

392
        status_frames = session.query(Frame).filter(
×
393
            Frame.type_name == 'Wiring',
394
            Frame.time >= session_start,
395
            Frame.time <= dt.datetime.fromtimestamp(time)
396
        ).order_by(Frame.time)
397

398
        status = {}
×
399
        cur_file = None
×
400
        for frame_info in tqdm(status_frames.all(), disable=(not show_pb)):
×
401
            file = frame_info.file.path
×
402
            if file != cur_file:
×
403
                reader = so3g.G3Reader(file)
×
404
                cur_file = file
×
405
            reader.seek(frame_info.offset)
×
406
            frame = reader.Process(None)[0]
×
407
            status.update(yaml.safe_load(frame['status']))
×
408

409
        return SmurfStatus(status)
×
410

411

412
class SmurfStatus:
×
413
    """
414
    This is a class that attempts to extract essential information from the
415
    SMuRF status dictionary so it is more easily accessible. If the necessary
416
    information for an attribute is not present in the dictionary, the
417
    attribute will be set to None.
418

419
    Args
420
    -----
421
        status  : dict
422
            A SMuRF status dictionary
423

424
    Attributes
425
    ------------
426
        status : dict
427
            Full smurf status dictionary
428
        num_chans: int
429
            Number of channels that are streaming
430
        mask : Optional[np.ndarray]
431
            Array with length ``num_chans`` that describes the mapping
432
            of readout channel to absolute smurf channel.
433
        mask_inv : np.ndarray
434
            Array with dimensions (NUM_BANDS, CHANS_PER_BAND) where
435
            ``mask_inv[band, chan]`` tells you the readout channel for a given
436
            band, channel combination.
437
        freq_map : Optional[np.ndarray]
438
            An array of size (NUM_BANDS, CHANS_PER_BAND) that has the mapping
439
            from (band, channel) to resonator frequency. If the mapping is not
440
            present in the status dict, the array will full of np.nan.
441
        filter_a : Optional[np.ndarray]
442
            The A parameter of the readout filter.
443
        filter_b : Optional[np.ndarray]
444
            The B parameter of the readout filter.
445
        filter_gain : Optional[float]
446
            The gain of the readout filter.
447
        filter_order : Optional[int]
448
            The order of the readout filter.
449
        filter_enabled : Optional[bool]
450
            True if the readout filter is enabled.
451
        downsample_factor : Optional[int]
452
            Downsampling factor
453
        downsample_enabled : Optional[bool]
454
            Whether downsampler is enabled
455
        flux_ramp_rate_hz : float
456
            Flux Ramp Rate calculated from the RampMaxCnt and the digitizer
457
            frequency.
458
    """
459
    NUM_BANDS = 8
×
460
    CHANS_PER_BAND = 512
×
461

462
    def __init__(self, status):
×
463
        self.status = status
×
464

465
        # Reads in useful status values as attributes
466
        mapper_root = 'AMCc.SmurfProcessor.ChannelMapper'
×
467
        self.num_chans = self.status.get(f'{mapper_root}.NumChannels')
×
468

469
        # Tries to set values based on expected rogue tree
470
        self.mask = self.status.get(f'{mapper_root}.Mask')
×
471
        self.mask_inv = np.full((self.NUM_BANDS, self.CHANS_PER_BAND), -1)
×
472
        if self.mask is not None:
×
473
            self.mask = np.array(ast.literal_eval(self.mask))
×
474

475
            # Creates inverse mapping
476
            for i, chan in enumerate(self.mask):
×
477
                b = chan // self.CHANS_PER_BAND
×
478
                c = chan % self.CHANS_PER_BAND
×
479
                self.mask_inv[b, c] = i
×
480

481
        filter_root = 'AMCc.SmurfProcessor.Filter'
×
482
        self.filter_a = self.status.get(f'{filter_root}.A')
×
483
        if self.filter_a is not None:
×
484
            self.filter_a = np.array(ast.literal_eval(self.filter_a))
×
485
        self.filter_b = self.status.get(f'{filter_root}.B')
×
486
        if self.filter_b is not None:
×
487
            self.filter_b = np.array(ast.literal_eval(self.filter_b))
×
488
        self.filter_gain = self.status.get(f'{filter_root}.Gain')
×
489
        self.filter_order = self.status.get(f'{filter_root}.Order')
×
490
        self.filter_enabled = not self.status.get('{filter_root}.Disabled')
×
491

492
        ds_root = 'AMCc.SmurfProcessor.Downsampler'
×
493
        self.downsample_factor = self.status.get(f'{ds_root}.Factor')
×
494
        self.downsample_enabled = not self.status.get(f'{ds_root}.Disabled')
×
495

496
        # Tries to make resonator frequency map
497
        self.freq_map = np.full((self.NUM_BANDS, self.CHANS_PER_BAND), np.nan)
×
498
        band_roots = [
×
499
            f'AMCc.FpgaTopLevel.AppTop.AppCore.SysgenCryo.Base[{band}]'
500
            for band in range(self.NUM_BANDS)]
501
        for band in range(self.NUM_BANDS):
×
502
            band_root = band_roots[band]
×
503
            band_center = self.status.get(f'{band_root}.bandCenterMHz')
×
504
            subband_offset = self.status.get(f'{band_root}.toneFrequencyOffsetMHz')
×
505
            channel_offset = self.status.get(f'{band_root}.CryoChannels.centerFrequencyArray')
×
506

507
            # Skip band if one of these fields is None
508
            if None in [band_center, subband_offset, channel_offset]:
×
509
                continue
×
510

511
            subband_offset = np.array(ast.literal_eval(subband_offset))
×
512
            channel_offset = np.array(ast.literal_eval(channel_offset))
×
513
            self.freq_map[band] = band_center + subband_offset + channel_offset
×
514

515
        # Calculates flux ramp reset rate (Pulled from psmurf's code)
516
        rtm_root = 'AMCc.FpgaTopLevel.AppTop.AppCore.RtmCryoDet'
×
517
        ramp_max_cnt = self.status.get(f'{rtm_root}.RampMaxCnt')
×
518
        if ramp_max_cnt is None:
×
519
            self.flux_ramp_rate_hz = None
×
520
        else:
521
            digitizer_freq_mhz = float(self.status.get(
×
522
                f'{band_roots[0]}.digitizerFrequencyMHz', 614.4))
523
            ramp_max_cnt_rate_hz = 1.e6*digitizer_freq_mhz / 2.
×
524
            self.flux_ramp_rate_hz = ramp_max_cnt_rate_hz / (ramp_max_cnt + 1)
×
525

526
    def readout_to_smurf(self, rchan):
×
527
        """
528
        Converts from a readout channel number to (band, channel).
529

530
        Args
531
        -----
532
            rchans : int or List[int]
533
                Readout channel to convert. If a list or array is passed,
534
                this will return an array of bands and array of smurf channels.
535

536
        Returns
537
        --------
538
            band, channel : (int, int) or (List[int], List[int])
539
                The band, channel combination that is has readout channel
540
                ``rchan``.
541
        """
542
        abs_smurf_chan = self.mask[rchan]
×
543
        return (abs_smurf_chan // self.CHANS_PER_BAND,
×
544
                abs_smurf_chan % self.CHANS_PER_BAND)
545

546
    def smurf_to_readout(self, band, chan):
×
547
        """
548
        Converts from (band, channel) to a readout channel number.
549
        If the channel is not streaming, returns -1.
550

551
        Args:
552
            band : int, List[int]
553
                The band number, or list of band numbers corresopnding to
554
                channel input array.
555
            chan : int, List[int]
556
                Channel number or list of channel numbers.
557
        """
558
        return self.mask_inv[band, chan]
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc