• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

simonsobs / so3g / 6187544102

14 Sep 2023 03:20PM UTC coverage: 47.602% (-0.3%) from 47.864%
6187544102

push

github

web-flow
Merge pull request #156 from simonsobs/load-range-level3

Add ability for load_range() to read HK books

15 of 15 new or added lines in 1 file covered. (100.0%)

1221 of 2565 relevant lines covered (47.6%)

0.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

62.11
/python/hk/getdata.py
1
"""Interface for querying and loading contiguous vectors from
2
Housekeeping frame streams.  We want to expose a sisock-compatible
3
API, where time ranges are an essential part of every query.
4

5
Use HKArchiveScanner to scan a set of G3 files containing Housekeeping
6
frames.  Use the resulting HKArchive to perform random I/O over a
7
sisock-style API.
8

9
"""
10
import itertools
1✔
11
import os
1✔
12
import pytz
1✔
13
import yaml
1✔
14
import logging
1✔
15
import pickle
1✔
16

17
import numpy as np
1✔
18
import datetime as dt
1✔
19

20

21
import so3g
1✔
22
from spt3g import core
1✔
23

24

25
hk_logger = logging.getLogger(__name__)
1✔
26
hk_logger.setLevel(logging.INFO)
1✔
27

28
SPAN_BUFFER_SECONDS = 1.0
1✔
29

30
def is_sub_seq(full_seq, sub_seq):
1✔
31
    """Return true if sub_seq is a sub-sequence of full_seq.
32

33
    """
34
    if len(sub_seq) == 0:
1✔
35
        return True
1✔
36
    for idx0,x0 in enumerate(full_seq):
1✔
37
        if x0 == sub_seq[0]:
1✔
38
            if is_sub_seq(full_seq[idx0+1:], sub_seq[1:]):
1✔
39
                return True
1✔
40
    return False
1✔
41

42

43
_SCHEMA_V1_BLOCK_TYPES = [
1✔
44
    core.G3VectorDouble,
45
    core.G3VectorInt,
46
    core.G3VectorString,
47
    core.G3VectorBool,
48
]
49

50

51
def _concat_hk_stream(blocks_in):
1✔
52
    """Concatenates an ordered list of compatible HK blocks into a single
53
    frame.  Each block should be a valid G3TimesampleMap with the same
54
    keys.
55

56
    Returns a single G3TimesampleMap with all fields concatenated.
57

58
    """
59
    blk = core.G3TimesampleMap()
1✔
60
    blk.times = core.G3VectorTime(blocks_in[0].times)
1✔
61
    fields = list(blocks_in[0].keys())
1✔
62
    for f in fields:
1✔
63
        f_short = f.split('.')[-1]
1✔
64
        blk[f] = blocks_in[0][f_short]
1✔
65
    for b in blocks_in[1:]:
1✔
66
        blk.times.extend(b.times)
1✔
67
    for f in fields:
1✔
68
        f_short = f.split('.')[-1]
1✔
69
        for _type in _SCHEMA_V1_BLOCK_TYPES:
1✔
70
            if isinstance(blocks_in[0][f_short], _type):
1✔
71
                break
1✔
72
        else:
73
            raise RuntimeError('Field "%s" is of unsupported type %s.' %
×
74
                               (f_short, type(blocks_in[0][f_short])))
75
        for b in blocks_in[1:]:
1✔
76
            blk[f].extend(b[f_short])
1✔
77
    return blk
1✔
78

79

80
class HKArchive:
1✔
81
    """Container for information necessary to determine what data fields
82
    are present in a data archive at what times.  This object has
83
    methods that can determine what fields have data over a given time
84
    range, and can group fields that share a timeline (i.e. are
85
    co-sampled) over that range.
86

87
    """
88
    field_groups = None
1✔
89
    def __init__(self, field_groups=None):
1✔
90
        if field_groups is not None:
1✔
91
            self.field_groups = list(field_groups)
1✔
92
        # A translator is used to update frames, on the fly, to the
93
        # modern schema assumed here.
94
        self.translator = so3g.hk.HKTranslator()
1✔
95

96
    def _get_groups(self, fields=None, start=None, end=None,
1✔
97
                    short_match=False):
98
        """Helper for get_fields and get_data.  Determines which fields, of
99
        those listed in fields, are present in the archive between the
100
        specified times.
101

102
        Args:
103
            fields (list of strings): List of field names.  If None,
104
              all fields are considered.
105
            start (timestamp): Earliest time to consider.  If None,
106
              consider arbitrarily early times.
107
            end (timestamp): Latest time to consider.  If None,
108
              consider arbitrarily late times.  Note that ``start``
109
              and ``end`` form a semi-closed interval that includes
110
              start but excludes end.
111
            short_match (bool): If True, then a requested field will
112
              be considered to match an archive field if its
113
              "."-tokenized form is a sub-sequence of the
114
              "."-tokenized field in the archive.  For example, the
115
              archive field "obs.agentX.feeds.therms.c1" may be
116
              matched by the requested field "agentX.c1".  In the case
117
              that multiple archive fields match a requested field, a
118
              ValueError is thrown.
119

120
        Returns:
121
            List of (group_name, group_fields, fgs).  The
122
            ``group_name`` is internally generated... probably
123
            something like "group0".  ``group_fields`` is a list of
124
            fields belonging to this group.  ``fgs`` is a list of
125
            _FieldGroup objects relevant to the fields in this
126
            group.
127

128
        Notes:
129
            Each entry in the returned list carries information for
130
            set of fields that are co-sampled over the requested time
131
            interval.  Each of the requested fields will thus appear
132
            in at most one group.
133

134
            If a requested field is not found, it will not be listed
135
            in any group.
136

137
        """
138
        span = so3g.IntervalsDouble()
1✔
139
        if start is None:
1✔
140
            start = span.domain[0]
1✔
141
        if end is None:
1✔
142
            end = span.domain[1]
1✔
143
        span.add_interval(start, end)
1✔
144

145
        if short_match and (fields is not None):
1✔
146
            field_seqs = [f.split('.') for f in fields]
1✔
147
            short_match_map = {}  # map from shortened name to full field name.
1✔
148

149
        field_map = {}
1✔
150
        for fg in self.field_groups:
1✔
151
            both = span * fg.cover
1✔
152
            if len(both.array()) > 0:
1✔
153
                for f in fg.fields:
1✔
154
                    full_field = fg.prefix + '.' + f
1✔
155
                    key_field = full_field
1✔
156
                    if fields is not None:
1✔
157
                        # User is interested only in particular fields.
158
                        if short_match:
1✔
159
                            for seq in field_seqs:
1✔
160
                                if is_sub_seq(full_field.split('.'), seq):
1✔
161
                                    key_field = '.'.join(seq)
1✔
162
                                    prev_short_match = short_match_map.get(key_field)
1✔
163
                                    if prev_short_match not in [None, full_field]:
1✔
164
                                        raise ValueError("Multiple matches for soft key: %s [%s, %s]." %
×
165
                                                         (key_field, prev_short_match, full_field))
166
                                    short_match_map[key_field] = full_field
1✔
167
                                    break
1✔
168
                            else:
169
                                continue
1✔
170
                        elif full_field not in fields:
×
171
                            continue
×
172
                    if not key_field in field_map:
1✔
173
                        field_map[key_field] = [fg]
1✔
174
                    else:
175
                        field_map[key_field].append(fg)
×
176

177
        # Sort each list of field_groups by object id -- all we care
178
        # about is whether two fields have the same field group set.
179
        [f.sort(key=lambda obj: id(obj)) for f in field_map.values()]
1✔
180

181
        # Now group together fields if they have identical
182
        # field_group lists (because when they do, they can share a
183
        # timeline).
184
        keys = list(field_map.keys())
1✔
185
        grouped = []
1✔
186
        i0, i1 = 0, 1
1✔
187
        while i0 < len(keys):
1✔
188
            while i1 < len(keys) and field_map[keys[i0]] == field_map[keys[i1]]:
1✔
189
                i1 += 1
1✔
190
            group_keys = keys[i0:i1]
1✔
191
            group_name = 'group%i' % len(grouped)
1✔
192
            grouped.append((group_name, group_keys, field_map[group_keys[0]]))
1✔
193
            i0, i1 = i1, i1+1
1✔
194
        return grouped
1✔
195

196
    def get_fields(self, start=None, end=None):
1✔
197
        """Get list of fields that might have a sample in the time interval
198
        [start,end).
199

200
        Returns the pair of dictionaries ``(fields, timelines)``.
201

202
        The ``fields`` dictionary is a map from field name to a block
203
        of field information.  The ``timelines`` dictionary is a map
204
        from timeline name to a block of timeline information.
205

206
        """
207
        grouped = self._get_groups(None, start, end)
1✔
208
        # Construct the fields and timelines dicts.
209
        field_map, timeline_map = {}, {}
1✔
210
        for (group_name, fields, data) in grouped:
1✔
211
            timeline_map[group_name] = {'interval': None, 'field': fields}
1✔
212
            for f in fields:
1✔
213
                field_map[f]  = {
1✔
214
                    'description': None,
215
                    'timeline': group_name,
216
                    'type': 'number',
217
                    'units': None,
218
                }
219
        return field_map, timeline_map
1✔
220

221
    def get_data(self, field=None, start=None, end=None, min_stride=None,
1✔
222
                 raw=False, short_match=False):
223
        """Load data from specified field(s) between specified times.
224

225
        Arguments ``field``, ``start``, ``end``, ``short_match`` are
226
        as described in _get_groups.
227

228
        Arguments:
229
            min_stride (float): Specifies the minimum sample spacing,
230
                in seconds.  Ignored in this implementation.
231
            raw (bool): If true, return G3 blocks instead of numpy
232
                arrays.
233

234
        Returns:
235
            Pair of dictionaries, (data, timelines).  The ``data``
236
            dictionary is a simple map from field name to a numpy
237
            array of readings.  The ``timelines`` dictionary is a map
238
            from field group name to a dictionary of timeline
239
            information, which has entries:
240

241
            - ``'t'``: numpy array of timestamps
242
            - ``'fields'``: list of fields belonging to this group.
243
            - ``'finalized_until'``: in cases where the data are still
244
              in flux, this field provides a timestamp that may be
245
              taken as the earliest time that needs to be requeried.
246
              This is part of the interface in order to support data
247
              streams that are being updated in real time.
248

249
            If user requested raw=True, then return value is a list
250
            of tuples of the form (group_name, block) where block is a
251
            single G3TimesampleMap carrying all the data for that
252
            co-sampled group.
253

254
        """
255
        grouped = self._get_groups(field, start, end, short_match=short_match)
1✔
256
        hk_logger.debug('get_data: _get_groups returns %i groups.' % len(grouped))
1✔
257

258
        # Pass through the metadata.  Collect information on what
259
        # field groups are present in what frames of what files; sort
260
        # that info by file and offset so we make a single monotonic
261
        # pass through the frames.
262
        group_info = {
1✔
263
            #  group_name: {'types': [dtype, ...],
264
            #               'fields': [(full_name, short_name), ...],
265
            #               'count': n},
266
            #  ...
267
        }
268
        files = {
1✔
269
            # filename: {
270
            #   offset: [(block_index, group_name, output_offset), ...]],
271
            #   ...
272
            #   },
273
            # ...
274
        }
275

276
        def check_overlap(time_range):
1✔
277
            # Note the time_range is inclusive on both ends.
278
            return ((start is None or start <= time_range[1]) and
1✔
279
                    (end is None or end > time_range[0]))
280

281
        for group_name, fields, fgrps in grouped:
1✔
282
            # This is a group of co-sampled fields.  The fields share
283
            # a sample count and a frame-index map.
284
            all_frame_refs = []
1✔
285
            for fg in fgrps:
1✔
286
                all_frame_refs.extend(
1✔
287
                    [(b['time_range'], b['count'], b['filename'], b['byte_offset'], b['block_index'])
288
                     for b in fg.index_info])
289
            all_frame_refs.sort()
1✔
290
            vector_offset = 0
1✔
291
            for time_range, n, filename, byte_offset, block_index in all_frame_refs:
1✔
292
                if not check_overlap(time_range):
1✔
293
                    continue
×
294
                if not filename in files:
1✔
295
                    files[filename] = {}
1✔
296
                if byte_offset not in files[filename]:
1✔
297
                    files[filename][byte_offset] = []
1✔
298
                files[filename][byte_offset].append((block_index, group_name, vector_offset))
1✔
299
                vector_offset += n
1✔
300
            group_info[group_name] = {
1✔
301
                'count': vector_offset,
302
                'fields': [(f, f.split('.')[-1]) for f in fields],
303
                'types': [],
304
            }
305

306
        # Pass through the data.  Should read the files in order,
307
        # jumping monotonically through the needed frames.  The data
308
        # type of output arrays is determined from whatever
309
        # np.array(G3Object) returns on the first block.  Note strings
310
        # ('U') have to be handled differently because we can't know
311
        # the maximum string length from the first block.
312
        data = {}
1✔
313
        timelines = {}
1✔
314
        for filename, file_map in sorted(files.items()):
1✔
315
            hk_logger.debug('get_data: reading %s' % filename)
1✔
316
            reader = so3g.G3IndexedReader(filename)
1✔
317
            for byte_offset, frame_info in sorted(file_map.items()):
1✔
318
                # Seek and decode.
319
                hk_logger.debug('get_data: seeking to %i for %i block extractions' %
1✔
320
                                (byte_offset, len(frame_info)))
321
                reader.Seek(byte_offset)
1✔
322
                frames = reader.Process(None)
1✔
323
                assert(len(frames) == 1)
1✔
324
                frames = self.translator(frames[0])
1✔
325
                frame = frames[0]
1✔
326
                # Now expand all blocks.
327
                for block_index, group_name, offset in frame_info:
1✔
328
                    block = frame['blocks'][block_index]
1✔
329
                    gi = group_info[group_name]
1✔
330
                    if raw:
1✔
331
                        # Short-circuit if in raw mode, just collect all blocks.
332
                        if group_name not in data:
1✔
333
                            data[group_name] = {}
1✔
334
                            for field, f_short in gi['fields']:
1✔
335
                                data[group_name] = []
1✔
336
                        data[group_name].append(block)
1✔
337
                        continue
1✔
338
                    if group_name not in timelines:
1✔
339
                        # This block is init that happens only once per group.
340
                        timelines[group_name] = {
1✔
341
                            't_g3': np.zeros(gi['count'], dtype=np.int64),
342
                            'fields': [f for f,s in gi['fields']],
343
                        }
344
                        hk_logger.debug('get_data: creating group "%s" with %i fields'
1✔
345
                                        % (group_name, len(gi['fields'])))
346
                        # Determine data type of each field and create output arrays.
347
                        for field, f_short in gi['fields']:
1✔
348
                            dtype = np.array(block[f_short]).dtype
1✔
349
                            gi['types'].append(dtype)
1✔
350
                            if dtype.char == 'U':
1✔
351
                                data[field] = []
1✔
352
                            else:
353
                                data[field] = np.empty(gi['count'], dtype)
1✔
354
                            hk_logger.debug('get_data: field "%s" has type %s' % (
1✔
355
                                f_short, dtype))
356
                    # Copy in block data.
357
                    n = len(block.times)
1✔
358
                    # Note this is in G3 time units for now... fixed later.
359
                    timelines[group_name]['t_g3'][offset:offset+n] = block.times
1✔
360
                    for (field, f_short), dtype in zip(gi['fields'], gi['types']):
1✔
361
                        if dtype.char == 'U':
1✔
362
                            data[field].append((offset, list(map(str, block[f_short]))))
1✔
363
                        else:
364
                            # This is a relatively quick copy because
365
                            # of buffer pass-through from G3... don't
366
                            # hit the RHS with np.array!
367
                            data[field][offset:offset+n] = block[f_short]
1✔
368

369
        if raw:
1✔
370
            return [(group_name, _concat_hk_stream(data[group_name]))
1✔
371
                    for group_name, _, _ in grouped]
372

373
        # Finalize string fields.
374
        for group_name, fields, fgrps in grouped:
1✔
375
            gi = group_info[group_name]
1✔
376
            for (field, f_short), dtype in zip(gi['fields'], gi['types']):
1✔
377
                if dtype.char == 'U':
1✔
378
                    data[field] = np.array(list(itertools.chain(
1✔
379
                        *[x for i, x in sorted(data[field])])))
380
                    assert(len(data[field]) == gi['count'])
1✔
381

382
        # Scale out time units.
383
        for timeline in timelines.values():
1✔
384
            timeline['t'] = timeline.pop('t_g3') / core.G3Units.seconds
1✔
385

386
        # Restrict to only the requested time range.
387
        if start is not None or end is not None:
1✔
388
            for timeline in timelines.values():
×
389
                i0, i1 = 0, len(timeline['t'])
×
390
                if start is not None:
×
391
                    i0 = np.searchsorted(timeline['t'], start)
×
392
                if end is not None:
×
393
                    i1 = np.searchsorted(timeline['t'], end)
×
394
                sl = slice(i0, i1)
×
395
                timeline['t'] = timeline['t'][sl]
×
396
                for k in timeline['fields']:
×
397
                    data[k] = data[k][sl]
×
398

399
        # Mark last time
400
        for timeline in timelines.values():
1✔
401
            if len(timeline['t']):
1✔
402
                timeline['finalized_until'] = timeline['t'][-1]
1✔
403
            else:
404
                timeline['finalized_until'] = start if start is not None else 0.
×
405

406
        return (data, timelines)
1✔
407

408
    def simple(self, fields=None, start=None, end=None, min_stride=None,
1✔
409
               raw=False, short_match=True):
410
        """Load data from specified field(s) between specified times, and
411
        unpack the data for ease of use.  Use get_data if you want to
412
        preserve the co-sampling structure.
413

414
        Arguments ``field``, ``start``, ``end``, ``short_match`` are
415
        as described in _get_groups.  However, ``fields`` can be a
416
        single string rather than a list of strings.
417

418
        Note that ``short_match`` defaults to True (which is not the
419
        case for getdata).x
420

421
        Returns:
422
            List of pairs of numpy arrays (t,y) corresponding to each
423
            field in the ``fields`` list.  If ``fields`` is a string,
424
            a simple pair (t,y) is returned.  ``t`` and ``y`` are
425
            numpy arrays of equal length containing the timestamps and
426
            field readings, respectively.  In cases where two fields
427
            are co-sampled, the time vector will be the same object.
428

429
            In cases where there are no data for the requested field
430
            in the time range, a pair of length 0 float arrays is returned.
431

432
        """
433
        unpack = isinstance(fields, str)
1✔
434
        if unpack:
1✔
435
            fields = [fields]
×
436
        data, timelines = self.get_data(fields, start, end, min_stride, raw, short_match)
1✔
437
        output = {}
1✔
438
        fields_not_found = [f for f in fields]
1✔
439
        for timeline in timelines.values():
1✔
440
            # Make the array here, so that the same object is returned
441
            # for all fields in this group.
442
            _t = np.array(timeline['t'])
1✔
443
            for f in timeline['fields']:
1✔
444
                output[f] = (_t, np.array(data[f]))
1✔
445
                fields_not_found.remove(f)
1✔
446
        nothing = np.zeros((0,))
1✔
447
        for f in fields_not_found:
1✔
448
            output[f] = (nothing, nothing)
×
449
        output = [output[f] for f in fields]
1✔
450
        if unpack:
1✔
451
            output = output[0]
×
452
        return output
1✔
453

454

455
class _HKProvider:
1✔
456
    def __init__(self, prov_id, prefix):
1✔
457
        self.prov_id = prov_id
1✔
458
        self.prefix = prefix
1✔
459
        self.blocks = {}
1✔
460

461
    @classmethod
1✔
462
    def from_g3(cls, element):
1✔
463
        prov_id = element['prov_id'].value
1✔
464
        prefix = element['description'].value
1✔
465
        return cls(prov_id, prefix)
1✔
466

467

468
class HKArchiveScanner:
1✔
469
    """Consumes SO Housekeeping streams and creates a record of what fields
470
    cover what time ranges.  This can run as a G3Pipeline module, but
471
    will not be able to record stream indexing information in that
472
    case.  If it's populated through the process_file method, then
473
    index information (in the sense of filenames and byte offsets)
474
    will be stored.
475

476
    After processing frames, calling .finalize() will return an
477
    HKArchive that can be used to load data more efficiently.
478

479
    """
480
    def __init__(self, pre_proc_dir=None, pre_proc_mode=None):
1✔
481
        self.session_id = None
1✔
482
        self.providers = {}
1✔
483
        self.field_groups = []
1✔
484
        self.frame_info = []
1✔
485
        self.counter = -1
1✔
486
        self.translator = so3g.hk.HKTranslator()
1✔
487
        self.pre_proc_dir = pre_proc_dir
1✔
488
        self.pre_proc_mode = pre_proc_mode
1✔
489

490
    def __call__(self, *args, **kw):
1✔
491
        return self.Process(*args, **kw)
×
492

493
    def Process(self, f, index_info=None):
1✔
494
        """Processes a frame.  Only Housekeeping frames will be examined;
495
        other frames will simply be counted.  All frames are passed
496
        through unmodified.  The index_info will be stored along with
497
        a description of the frame's data; see the .process_file
498
        function.
499

500
        """
501
        self.counter += 1
1✔
502
        if index_info is None:
1✔
503
            index_info = {'counter': self.counter}
×
504

505
        f = self.translator(f)
1✔
506
        assert(len(f) == 1)
1✔
507
        f = f[0]
1✔
508

509
        if f.type == core.G3FrameType.EndProcessing:
1✔
510
            return [f]
×
511

512
        if f.type != core.G3FrameType.Housekeeping:
1✔
513
            return [f]
1✔
514

515
        vers = f.get('hkagg_version', 0)
1✔
516
        assert(vers == 2)
1✔
517

518
        if f['hkagg_type'] == so3g.HKFrameType.session:
1✔
519
            session_id = f['session_id']
1✔
520
            if self.session_id is not None:
1✔
521
                if self.session_id != session_id:
×
522
                    self.flush()  # Note that this sets self.session_id=None.
×
523
            if self.session_id is None:
1✔
524
                core.log_info('New HK Session id = %i, timestamp = %i' %
1✔
525
                              (session_id, f['start_time']), unit='HKScanner')
526
                self.session_id = session_id
1✔
527

528
        elif f['hkagg_type'] == so3g.HKFrameType.status:
1✔
529
            # If a provider has disappeared, flush its information into a
530
            # FieldGroup.
531
            prov_cands = [_HKProvider.from_g3(p) for p in f['providers']]
1✔
532
            to_flush = list(self.providers.keys())  # prov_ids...
1✔
533
            for p in prov_cands:
1✔
534
                if p.prov_id in to_flush:
1✔
535
                    to_flush.remove(p.prov_id) # no, don't.
×
536
                else:
537
                    self.providers[p.prov_id] = p
1✔
538
            for prov_id in to_flush:
1✔
539
                self.flush([prov_id])
×
540

541
        elif f['hkagg_type'] == so3g.HKFrameType.data:
1✔
542
            # Data frame -- merge info for this provider.
543
            prov = self.providers[f['prov_id']]
1✔
544
            representatives = prov.blocks.keys()
1✔
545

546
            for bidx, (bname, b) in enumerate(zip(f['block_names'], f['blocks'])):
1✔
547
                assert(isinstance(b, core.G3TimesampleMap))
1✔
548
                if bname not in prov.blocks:
1✔
549
                    prov.blocks[bname] = {'fields': list(b.keys()),
1✔
550
                                          'start': b.times[0].time / core.G3Units.seconds,
551
                                          'index_info': []}
552
                # To ensure that the last sample is actually included
553
                # in the semi-open intervals we use to track frames,
554
                # the "end" time has to be after the final sample.
555
                prov.blocks[bname]['end'] = b.times[-1].time / core.G3Units.seconds + SPAN_BUFFER_SECONDS
1✔
556
                ii = {'block_index': bidx,
1✔
557
                      'time_range': (b.times[0].time / core.G3Units.seconds,
558
                                     b.times[-1].time / core.G3Units.seconds),
559
                      'count': len(b.times)}
560
                ii.update(index_info)
1✔
561
                prov.blocks[bname]['index_info'].append(ii)
1✔
562
                
563
        else:
564
            core.log_warn('Weird hkagg_type: %i' % f['hkagg_type'],
×
565
                          unit='HKScanner')
566
        return [f]
1✔
567

568
    def flush(self, provs=None):
1✔
569
        """Convert any cached provider information into _FieldGroup
570
        information.  Delete the provider information.  This will be
571
        called automatically during frame processing if a provider
572
        session ends.  Once frame processing has completed, this
573
        function should be called, with no arguments, to flush any
574
        remaining provider sessions.
575

576
        Args:
577
            provs (list or None): list of provider identifiers to
578
              flush.  If None, flush all and also reset the
579
              self.session_id.
580

581
        Returns:
582
            None
583

584
        """
585
        if provs is None:
1✔
586
            provs = list(self.providers.keys())
1✔
587
            self.session_id = None
1✔
588
        for p in provs:
1✔
589
            prov = self.providers.pop(p)
1✔
590
            blocks = prov.blocks
1✔
591
            for k, info in blocks.items():
1✔
592
                fg = _FieldGroup(prov.prefix, info['fields'], info['start'],
1✔
593
                                 info['end'], info['index_info'])
594
                self.field_groups.append(fg)
1✔
595

596
    def finalize(self):
1✔
597
        """Finalize the processing by calling flush(), then return an
598
        HKArchive with all the _FieldGroup information from this
599
        scanning session.
600
        """
601
        self.flush()
1✔
602
        return HKArchive(self.field_groups)
1✔
603

604
    def process_file(self, filename, flush_after=True):
1✔
605
        """Process the file specified by ``filename`` using a G3IndexedReader.
606
        Each frame from the file is passed to self.Process, with the
607
        optional index_info argument set to a dictionary containing
608
        the filename and byte_offset of the frame.
609

610
        Internal data grouping will be somewhat cleaner if the
611
        multiple files from a single aggregator "session" are passed
612
        to this function in acquisition order.  In that case, call
613
        with flush_after=False.
614

615
        """
616
        reader = so3g.G3IndexedReader(filename)
1✔
617
        while True:
618
            info = {'filename': filename,
1✔
619
                    'byte_offset': reader.Tell()}
620
            frames = reader.Process(None)
1✔
621
            assert len(frames) <= 1
1✔
622
            if len(frames) == 0:
1✔
623
                break
1✔
624
            self.Process(frames[0], info)
1✔
625
        # Calling flush() here protects us against the odd case that
626
        # we process files from a single session in non-consecutive
627
        # order.  In that case the start' and 'end' times will get
628
        # messed up because we can't tell the stream has been
629
        # re-initialized.
630
        if flush_after:
1✔
631
            self.flush()
1✔
632

633

634
    def process_file_with_cache(self, filename):
1✔
635
        """Processes file specified by ``filename`` using the process_file
636
           method above. If self.pre_proc_dir is specified (not None), it
637
           will load pickled HKArchiveScanner objects and concatenates with
638
           self instead of re-processing each frame, if the corresponding
639
           file exists.  If the pkl file does not exist, it processes it and
640
           saves the result (in the pre_proc_dir) so it can be used in the
641
           future.  If self.pre_proc_dir is not specified, this becomes
642
           equivalent to process_file.
643
        """
644
        if self.pre_proc_dir is None:
1✔
645
            self.process_file(filename)
1✔
646
            return
1✔
647

648
        folder = os.path.basename(filename)[:5]
×
649
        path = os.path.join( self.pre_proc_dir, folder, os.path.basename(filename).replace(".g3",'.pkl') )
×
650

651
        if os.path.exists(path):
×
652
            with open(path, 'rb') as pkfl:
×
653
                hksc = pickle.load(pkfl)
×
654

655
        else:
656
            hksc = HKArchiveScanner()
×
657
            hksc.process_file(filename)
×
658
            # Make dirs if needed
659
            if not os.path.exists( os.path.dirname(path) ):
×
660
                os.makedirs( os.path.dirname(path) )
×
661
                if self.pre_proc_mode is not None:
×
662
                    os.chmod( os.path.dirname(path), self.pre_proc_mode )
×
663
            # Save pkl file
664
            with open(path, 'wb') as pkfl:
×
665
                pickle.dump(hksc, pkfl)
×
666
            if self.pre_proc_mode is not None:
×
667
                os.chmod( path, self.pre_proc_mode )            
×
668

669
        self.field_groups += hksc.field_groups
×
670
        self.counter += hksc.counter
×
671

672

673

674

675
class _FieldGroup:
1✔
676
    """Container object for look-up information associated with a group of
677
    fields that share a timeline (i.e. a group of fields that are
678
    co-sampled over some requested time range).
679

680
    Attributes:
681
        prefix (str): Not used.
682
        fields (list of str): The field names.
683
        cover (IntervalsDouble): The time range (as unix timestamps)
684
          over which these fields have data.  This is expressed as an
685
          IntervalsDouble to enable the use of Intervals arithmetic.
686
          The range is created from the "start" and "end" arguments of
687
          the constructor.
688
        index_info (list of dict): Information that the consumer will
689
          use to locate and load the data efficiently.  The entries in
690
          the list represent time-ordered frames. See Notes.
691

692
    Notes:
693

694
      Each entry of index_info is a dict providing information about a
695
      single frame and block where data can be found.  The fields are:
696

697
      - 'filename' (str): The file in which the frame is located.
698
      - 'byte_offset' (int): The offset within the file at which to
699
        find the frame.
700
      - 'block_index' (int): The index of the block, within the
701
        corresponding frame, where the data are found.
702
      - 'count' (int): the number of samples in this block.
703
      - 'time_range' (tuple): (first time, last time), as unix
704
        timestamps.  Note the last time is the time of the last
705
        sample, not some time beyond that.
706
      - 'counter' (int): An index providing the order in which the
707
        frames were scanned.
708

709
    """
710
    def __init__(self, prefix, fields, start, end, index_info):
1✔
711
        self.prefix = prefix
1✔
712
        self.fields = list(fields)
1✔
713
        self.cover = so3g.IntervalsDouble().add_interval(start, end)
1✔
714
        self.index_info = index_info
1✔
715
    def __repr__(self):
1✔
716
        try:
×
717
            return '_FieldGroup("%s", %i fields, %i segments)' % (
×
718
                self.prefix, len(self.fields), len(self.index_info))
719
        except:
×
720
            return '_FieldGroup(<bad internal state!>)'
×
721

722

723
def to_timestamp(some_time, str_format=None): 
1✔
724
    """Convert the argument to a unix timestamp.
725

726
    Args:
727
      some_time: If a datetime, it is converted to UTC timezone and
728
        then to a unix timestamp.  If int or float, the value is
729
        returned unprocessed.  If str, a date will be extracted based
730
        on a few trial date format strings.
731
      str_format: a format string (for strptime) to try, instead of
732
        the default(s).
733

734
    Returns:
735
        float: Unix timestamp corresponding to some_time.
736

737
    """
738
    
739
    if type(some_time) == dt.datetime:
1✔
740
        return some_time.astimezone(dt.timezone.utc).timestamp()
×
741
    if type(some_time) == int or type(some_time) == float:
1✔
742
        return some_time
1✔
743
    if type(some_time) == str:
×
744
        if str_format is not None:
×
745
            return to_timestamp(pytz.utc.localize( dt.datetime.strptime(some_time, str_format )))
×
746
        str_options = ['%Y-%m-%d', '%Y-%m-%d %H:%M', '%Y-%m-%d %H:%M:%S', '%Y-%m-%d %H:%M:%S.%f']
×
747
        for option in str_options:
×
748
            try:
×
749
                return to_timestamp(pytz.utc.localize( dt.datetime.strptime(some_time, option )))
×
750
            except:
×
751
                continue
×
752
        raise ValueError('Could not process string into date object, options are: {}'.format(str_options))
×
753
        
754
    raise ValueError('Type of date / time indication is invalid, accepts datetime, int, float, and string')
×
755

756
def load_range(start, stop, fields=None, alias=None, 
1✔
757
               data_dir=None, config=None, pre_proc_dir=None, pre_proc_mode=None,
758
               daq_node=None, strict=True):
759
    """Args:
760

761
      start: Earliest time to search for data (see note on time
762
        formats).
763
      stop: Latest time to search for data (see note on time formats).
764
      fields: Fields to return, if None, returns all fields.
765
      alias: If not None, must be a list of strings providing exactly
766
        one value for each entry in fields.
767
      data_dir: directory where all the ctime folders are.  If None,
768
        tries to use $OCS_DATA_DIR.
769
      config: filename of a .yaml file for loading data_dir / fields /
770
        alias
771
      pre_proc_dir: Place to store pickled HKArchiveScanners for g3
772
        files to speed up loading
773
      pre_proc_mode: Permissions (passed to os.chmod) to be used on
774
        dirs and pkl files in the pre_proc_dir. No chmod if None.
775
      daq_node:  String of type of HK book (Ex: satp1, lat, site) to load
776
        if daq_node name not in data_dir. If None, daq_node name in
777
        data_dir, or loading .g3 files.
778
      strict: If False, log and skip missing fields rather than
779
        raising a KeyError.
780
                
781
    Returns:
782

783
      Dictionary with structure::
784

785
        {
786
            alias[i] : (time[i], data[i])
787
        }
788

789
      It will be masked to only have data between start and stop.
790
        
791
    Notes:
792

793
      The "start" and "stop" argument accept a variety of formats,
794
      including datetime objects, unix timestamps, and strings (see
795
      to_timestamp function).  In the case of datetime objects, you
796
      should set tzinfo=dt.timezone.utc explicitly if the system is
797
      not set to UTC time.
798

799
      Example usage::
800

801
        fields = [
802
            'observatory.HAN.feeds.temperatures.Channel 1 T',
803
            'observatory.HAN.feeds.temperatures.Channel 2 T',
804
        ]
805

806
        alias = [
807
            'HAN 1', 'HAN 2',
808
        ]
809

810
        start = dt.datetime(2020,2,19,18,48)
811
        stop = dt.datetime(2020,2,22)
812
        data = load_range(start, stop, fields=fields, alias=alias)
813

814
        plt.figure()
815
        for name in alias:
816
            plt.plot( data[name][0], data[name][1])
817

818
    """
819
    if config is not None:
×
820
        if not (data_dir is None and fields is None and alias is None):
×
821
            hk_logger.warning('''load_range has a config file - data_dir, fields, and alias are ignored''')
×
822
        with open(config, 'r') as f:
×
823
            setup = yaml.load(f, Loader=yaml.FullLoader)
×
824
        
825
        if 'data_dir' not in setup.keys():
×
826
            raise ValueError('load_range config file requires data_dir entry')
×
827
        data_dir = setup['data_dir']
×
828
        if 'field_list' not in setup.keys():
×
829
            raise ValueError('load_range config file requires field_list entry')
×
830
        fields = []
×
831
        alias = []
×
832
        for k in setup['field_list']:
×
833
            fields.append( setup['field_list'][k])
×
834
            alias.append( k )
×
835
            
836
    if data_dir is None and 'OCS_DATA_DIR' not in os.environ.keys():
×
837
        raise ValueError('if $OCS_DATA_DIR is not defined a data directory must be passed to getdata')
×
838
    if data_dir is None:
×
839
        data_dir = os.environ['OCS_DATA_DIR']
×
840

841
    hk_logger.debug('Loading data from {}'.format(data_dir))
×
842

843
    start_ctime = to_timestamp(start) - 3600
×
844
    stop_ctime = to_timestamp(stop) + 3600
×
845

846
    hksc = HKArchiveScanner(pre_proc_dir=pre_proc_dir)
×
847

848
    node_options = ['satp1', 'satp2', 'satp3', 'lat', 'site']
×
849
    for i in node_options:
×
850
        if i in data_dir:
×
851
            node = i
×
852
    
853
    for folder in range( int(start_ctime/1e5), int(stop_ctime/1e5)+1):
×
854
        if daq_node is None:
×
855
            if node in data_dir:
×
856
                book_path = 'hk_'+str(folder)+'_'+node
×
857
                base = data_dir+'/'+str(book_path)
×
858
            else:
859
                hk_logger.debug(f'No daq node info provided in {data_dir}, and'
×
860
                                'daq_node arg is None; going to assume data_dir'
861
                                'points to .g3 files')
862
                # assumes .g3 files but should be more explicit
863
                base = data_dir+'/'+str(folder)
×
864
        else:
865
            book_path = 'hk_'+str(folder)+'_'+daq_node
×
866
            base = data_dir+'/'+str(book_path)
×
867
        
868
        if not os.path.exists(base):
×
869
            hk_logger.debug('{} does not exist, skipping'.format(base))
×
870
            continue
×
871
    
872
        for file in sorted(os.listdir(base)):
×
873
            for i in file.split('.'):
×
874
                if i == '.yaml':
×
875
                    continue
×
876
            try:
×
877
                t = int(file[:-3])
×
878
            except:
×
879
                hk_logger.debug('{} does not have the right format, skipping'.format(file))
×
880
                continue
×
881
            if t >= start_ctime-3600 and t <=stop_ctime+3600:
×
882
                hk_logger.debug('Processing {}'.format(base+'/'+file))
×
883
                hksc.process_file_with_cache( base+'/'+file)
×
884
    
885
    
886
    cat = hksc.finalize()
×
887
    start_ctime = to_timestamp(start)
×
888
    stop_ctime = to_timestamp(stop)
×
889
    
890
    all_fields,_ = cat.get_fields()
×
891
    
892
    if fields is None:
×
893
        fields = all_fields
×
894
    if alias is not None:
×
895
        if len(alias) != len(fields):
×
896
            hk_logger.error('if provided, alias needs to be the length of fields')
×
897
    else:
898
        alias = fields
×
899
    
900
    # Single pass load.
901
    keepers = []
×
902
    for name, field in zip(alias, fields):
×
903
        if field not in all_fields:
×
904
            hk_logger.debug('`{}` not in available fields, skipping'.format(field))
×
905
            continue
×
906
        keepers.append((name, field))
×
907
    data = cat.simple([f for n, f in keepers],
×
908
                      start=start_ctime, end=stop_ctime)
909
    data = {name: data[i] for i, (name, field) in enumerate(keepers)}
×
910

911
    return data
×
912

913
if __name__ == '__main__':
1✔
914
    import argparse
×
915
    parser = argparse.ArgumentParser(
×
916
        usage='This demo can be used to plot data from SO HK files.')
917
    parser.add_argument('--field')
×
918
    parser.add_argument('files', nargs='+', help=
×
919
                        "SO Housekeeping files to load.")
920
    args = parser.parse_args()
×
921

922
    # Scan the file set.
923
    hkas = HKArchiveScanner()
×
924
    for filename in args.files:
×
925
        print(f'Scanning {filename}...')
×
926
        hkas.process_file(filename)
×
927

928
    print(' ... finalizing.')
×
929
    cat = hkas.finalize()
×
930

931
    # Get list of fields, timelines, spanning all times:
932
    fields, timelines = cat.get_fields()
×
933

934
    # Show
935
    for field_name in sorted(fields):
×
936
        group_name = fields[field_name]['timeline']
×
937
        print(field_name, timelines[group_name]['interval'])
×
938

939
    if args.field is None:
×
940
        full_name = list(fields.keys())[0]
×
941
        print('Pretending interest in:', full_name)
×
942
    else:
943
        full_name = args.field
×
944
        print('User requests:', full_name)
×
945

946
    # Identify the shortest form of this field that also works.
947
    f_toks = full_name.split('.')
×
948
    field_name = full_name
×
949
    for i in range(1, 2**len(f_toks)):
×
950
        short_name = '.'.join([t for b,t in enumerate(f_toks) if (i >> b) & 1])
×
951
        try:
×
952
            fields, timelines = cat.get_data([short_name], short_match=True)
×
953
        except Exception:
×
954
            continue
×
955
        if len(short_name) < len(field_name):
×
956
            field_name = short_name
×
957
            print(field_name)
×
958

959
    print('Name shortened to:', field_name)
×
960

961
    # This is the awkward way, which preserves co-sampled
962
    # relationships (and is thus annoying to decode in simple cases).
963
    fields, timelines = cat.get_data([field_name], short_match=True)
×
964
    x0 = list(timelines.values())[0]['t']
×
965
    y0 = fields[field_name]
×
966

967
    # This is the easy way, which just gives you one timeline per
968
    # requested field.
969
    x1, y1 = cat.simple(field_name)
×
970
    
971
    assert np.all(np.array(x0) == x1) and np.all(np.array(y0) == y1)
×
972

973
    import pylab as pl
×
974
    pl.plot(x1, y1)
×
975
    pl.title(field_name)
×
976
    pl.show()
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc