• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

simonsobs / so3g / 5893442820

17 Aug 2023 04:21PM UTC coverage: 47.864%. Remained the same
5893442820

push

github

web-flow
Merge pull request #153 from simonsobs/quiet-logs

Suppress hk logger.info messages

3 of 3 new or added lines in 2 files covered. (100.0%)

1221 of 2551 relevant lines covered (47.86%)

0.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

64.09
/python/hk/getdata.py
1
"""Interface for querying and loading contiguous vectors from
2
Housekeeping frame streams.  We want to expose a sisock-compatible
3
API, where time ranges are an essential part of every query.
4

5
Use HKArchiveScanner to scan a set of G3 files containing Housekeeping
6
frames.  Use the resulting HKArchive to perform random I/O over a
7
sisock-style API.
8

9
"""
10
import itertools
1✔
11
import os
1✔
12
import pytz
1✔
13
import yaml
1✔
14
import logging
1✔
15
import pickle
1✔
16

17
import numpy as np
1✔
18
import datetime as dt
1✔
19

20

21
import so3g
1✔
22
from spt3g import core
1✔
23

24

25
hk_logger = logging.getLogger(__name__)
1✔
26
hk_logger.setLevel(logging.INFO)
1✔
27

28
SPAN_BUFFER_SECONDS = 1.0
1✔
29

30
def is_sub_seq(full_seq, sub_seq):
1✔
31
    """Return true if sub_seq is a sub-sequence of full_seq.
32

33
    """
34
    if len(sub_seq) == 0:
1✔
35
        return True
1✔
36
    for idx0,x0 in enumerate(full_seq):
1✔
37
        if x0 == sub_seq[0]:
1✔
38
            if is_sub_seq(full_seq[idx0+1:], sub_seq[1:]):
1✔
39
                return True
1✔
40
    return False
1✔
41

42

43
_SCHEMA_V1_BLOCK_TYPES = [
1✔
44
    core.G3VectorDouble,
45
    core.G3VectorInt,
46
    core.G3VectorString,
47
    core.G3VectorBool,
48
]
49

50

51
def _concat_hk_stream(blocks_in):
1✔
52
    """Concatenates an ordered list of compatible HK blocks into a single
53
    frame.  Each block should be a valid G3TimesampleMap with the same
54
    keys.
55

56
    Returns a single G3TimesampleMap with all fields concatenated.
57

58
    """
59
    blk = core.G3TimesampleMap()
1✔
60
    blk.times = core.G3VectorTime(blocks_in[0].times)
1✔
61
    fields = list(blocks_in[0].keys())
1✔
62
    for f in fields:
1✔
63
        f_short = f.split('.')[-1]
1✔
64
        blk[f] = blocks_in[0][f_short]
1✔
65
    for b in blocks_in[1:]:
1✔
66
        blk.times.extend(b.times)
1✔
67
    for f in fields:
1✔
68
        f_short = f.split('.')[-1]
1✔
69
        for _type in _SCHEMA_V1_BLOCK_TYPES:
1✔
70
            if isinstance(blocks_in[0][f_short], _type):
1✔
71
                break
1✔
72
        else:
73
            raise RuntimeError('Field "%s" is of unsupported type %s.' %
×
74
                               (f_short, type(blocks_in[0][f_short])))
75
        for b in blocks_in[1:]:
1✔
76
            blk[f].extend(b[f_short])
1✔
77
    return blk
1✔
78

79

80
class HKArchive:
1✔
81
    """Container for information necessary to determine what data fields
82
    are present in a data archive at what times.  This object has
83
    methods that can determine what fields have data over a given time
84
    range, and can group fields that share a timeline (i.e. are
85
    co-sampled) over that range.
86

87
    """
88
    field_groups = None
1✔
89
    def __init__(self, field_groups=None):
1✔
90
        if field_groups is not None:
1✔
91
            self.field_groups = list(field_groups)
1✔
92
        # A translator is used to update frames, on the fly, to the
93
        # modern schema assumed here.
94
        self.translator = so3g.hk.HKTranslator()
1✔
95

96
    def _get_groups(self, fields=None, start=None, end=None,
1✔
97
                    short_match=False):
98
        """Helper for get_fields and get_data.  Determines which fields, of
99
        those listed in fields, are present in the archive between the
100
        specified times.
101

102
        Args:
103
            fields (list of strings): List of field names.  If None,
104
              all fields are considered.
105
            start (timestamp): Earliest time to consider.  If None,
106
              consider arbitrarily early times.
107
            end (timestamp): Latest time to consider.  If None,
108
              consider arbitrarily late times.  Note that ``start``
109
              and ``end`` form a semi-closed interval that includes
110
              start but excludes end.
111
            short_match (bool): If True, then a requested field will
112
              be considered to match an archive field if its
113
              "."-tokenized form is a sub-sequence of the
114
              "."-tokenized field in the archive.  For example, the
115
              archive field "obs.agentX.feeds.therms.c1" may be
116
              matched by the requested field "agentX.c1".  In the case
117
              that multiple archive fields match a requested field, a
118
              ValueError is thrown.
119

120
        Returns:
121
            List of (group_name, group_fields, fgs).  The
122
            ``group_name`` is internally generated... probably
123
            something like "group0".  ``group_fields`` is a list of
124
            fields belonging to this group.  ``fgs`` is a list of
125
            _FieldGroup objects relevant to the fields in this
126
            group.
127

128
        Notes:
129
            Each entry in the returned list carries information for
130
            set of fields that are co-sampled over the requested time
131
            interval.  Each of the requested fields will thus appear
132
            in at most one group.
133

134
            If a requested field is not found, it will not be listed
135
            in any group.
136

137
        """
138
        span = so3g.IntervalsDouble()
1✔
139
        if start is None:
1✔
140
            start = span.domain[0]
1✔
141
        if end is None:
1✔
142
            end = span.domain[1]
1✔
143
        span.add_interval(start, end)
1✔
144

145
        if short_match and (fields is not None):
1✔
146
            field_seqs = [f.split('.') for f in fields]
1✔
147
            short_match_map = {}  # map from shortened name to full field name.
1✔
148

149
        field_map = {}
1✔
150
        for fg in self.field_groups:
1✔
151
            both = span * fg.cover
1✔
152
            if len(both.array()) > 0:
1✔
153
                for f in fg.fields:
1✔
154
                    full_field = fg.prefix + '.' + f
1✔
155
                    key_field = full_field
1✔
156
                    if fields is not None:
1✔
157
                        # User is interested only in particular fields.
158
                        if short_match:
1✔
159
                            for seq in field_seqs:
1✔
160
                                if is_sub_seq(full_field.split('.'), seq):
1✔
161
                                    key_field = '.'.join(seq)
1✔
162
                                    prev_short_match = short_match_map.get(key_field)
1✔
163
                                    if prev_short_match not in [None, full_field]:
1✔
164
                                        raise ValueError("Multiple matches for soft key: %s [%s, %s]." %
×
165
                                                         (key_field, prev_short_match, full_field))
166
                                    short_match_map[key_field] = full_field
1✔
167
                                    break
1✔
168
                            else:
169
                                continue
1✔
170
                        elif full_field not in fields:
×
171
                            continue
×
172
                    if not key_field in field_map:
1✔
173
                        field_map[key_field] = [fg]
1✔
174
                    else:
175
                        field_map[key_field].append(fg)
×
176

177
        # Sort each list of field_groups by object id -- all we care
178
        # about is whether two fields have the same field group set.
179
        [f.sort(key=lambda obj: id(obj)) for f in field_map.values()]
1✔
180

181
        # Now group together fields if they have identical
182
        # field_group lists (because when they do, they can share a
183
        # timeline).
184
        keys = list(field_map.keys())
1✔
185
        grouped = []
1✔
186
        i0, i1 = 0, 1
1✔
187
        while i0 < len(keys):
1✔
188
            while i1 < len(keys) and field_map[keys[i0]] == field_map[keys[i1]]:
1✔
189
                i1 += 1
1✔
190
            group_keys = keys[i0:i1]
1✔
191
            group_name = 'group%i' % len(grouped)
1✔
192
            grouped.append((group_name, group_keys, field_map[group_keys[0]]))
1✔
193
            i0, i1 = i1, i1+1
1✔
194
        return grouped
1✔
195

196
    def get_fields(self, start=None, end=None):
1✔
197
        """Get list of fields that might have a sample in the time interval
198
        [start,end).
199

200
        Returns the pair of dictionaries ``(fields, timelines)``.
201

202
        The ``fields`` dictionary is a map from field name to a block
203
        of field information.  The ``timelines`` dictionary is a map
204
        from timeline name to a block of timeline information.
205

206
        """
207
        grouped = self._get_groups(None, start, end)
1✔
208
        # Construct the fields and timelines dicts.
209
        field_map, timeline_map = {}, {}
1✔
210
        for (group_name, fields, data) in grouped:
1✔
211
            timeline_map[group_name] = {'interval': None, 'field': fields}
1✔
212
            for f in fields:
1✔
213
                field_map[f]  = {
1✔
214
                    'description': None,
215
                    'timeline': group_name,
216
                    'type': 'number',
217
                    'units': None,
218
                }
219
        return field_map, timeline_map
1✔
220

221
    def get_data(self, field=None, start=None, end=None, min_stride=None,
1✔
222
                 raw=False, short_match=False):
223
        """Load data from specified field(s) between specified times.
224

225
        Arguments ``field``, ``start``, ``end``, ``short_match`` are
226
        as described in _get_groups.
227

228
        Arguments:
229
            min_stride (float): Specifies the minimum sample spacing,
230
                in seconds.  Ignored in this implementation.
231
            raw (bool): If true, return G3 blocks instead of numpy
232
                arrays.
233

234
        Returns:
235
            Pair of dictionaries, (data, timelines).  The ``data``
236
            dictionary is a simple map from field name to a numpy
237
            array of readings.  The ``timelines`` dictionary is a map
238
            from field group name to a dictionary of timeline
239
            information, which has entries:
240

241
            - ``'t'``: numpy array of timestamps
242
            - ``'fields'``: list of fields belonging to this group.
243
            - ``'finalized_until'``: in cases where the data are still
244
              in flux, this field provides a timestamp that may be
245
              taken as the earliest time that needs to be requeried.
246
              This is part of the interface in order to support data
247
              streams that are being updated in real time.
248

249
            If user requested raw=True, then return value is a list
250
            of tuples of the form (group_name, block) where block is a
251
            single G3TimesampleMap carrying all the data for that
252
            co-sampled group.
253

254
        """
255
        grouped = self._get_groups(field, start, end, short_match=short_match)
1✔
256
        hk_logger.debug('get_data: _get_groups returns %i groups.' % len(grouped))
1✔
257

258
        # Pass through the metadata.  Collect information on what
259
        # field groups are present in what frames of what files; sort
260
        # that info by file and offset so we make a single monotonic
261
        # pass through the frames.
262
        group_info = {
1✔
263
            #  group_name: {'types': [dtype, ...],
264
            #               'fields': [(full_name, short_name), ...],
265
            #               'count': n},
266
            #  ...
267
        }
268
        files = {
1✔
269
            # filename: {
270
            #   offset: [(block_index, group_name, output_offset), ...]],
271
            #   ...
272
            #   },
273
            # ...
274
        }
275

276
        def check_overlap(time_range):
1✔
277
            # Note the time_range is inclusive on both ends.
278
            return ((start is None or start <= time_range[1]) and
1✔
279
                    (end is None or end > time_range[0]))
280

281
        for group_name, fields, fgrps in grouped:
1✔
282
            # This is a group of co-sampled fields.  The fields share
283
            # a sample count and a frame-index map.
284
            all_frame_refs = []
1✔
285
            for fg in fgrps:
1✔
286
                all_frame_refs.extend(
1✔
287
                    [(b['time_range'], b['count'], b['filename'], b['byte_offset'], b['block_index'])
288
                     for b in fg.index_info])
289
            all_frame_refs.sort()
1✔
290
            vector_offset = 0
1✔
291
            for time_range, n, filename, byte_offset, block_index in all_frame_refs:
1✔
292
                if not check_overlap(time_range):
1✔
293
                    continue
×
294
                if not filename in files:
1✔
295
                    files[filename] = {}
1✔
296
                if byte_offset not in files[filename]:
1✔
297
                    files[filename][byte_offset] = []
1✔
298
                files[filename][byte_offset].append((block_index, group_name, vector_offset))
1✔
299
                vector_offset += n
1✔
300
            group_info[group_name] = {
1✔
301
                'count': vector_offset,
302
                'fields': [(f, f.split('.')[-1]) for f in fields],
303
                'types': [],
304
            }
305

306
        # Pass through the data.  Should read the files in order,
307
        # jumping monotonically through the needed frames.  The data
308
        # type of output arrays is determined from whatever
309
        # np.array(G3Object) returns on the first block.  Note strings
310
        # ('U') have to be handled differently because we can't know
311
        # the maximum string length from the first block.
312
        data = {}
1✔
313
        timelines = {}
1✔
314
        for filename, file_map in sorted(files.items()):
1✔
315
            hk_logger.debug('get_data: reading %s' % filename)
1✔
316
            reader = so3g.G3IndexedReader(filename)
1✔
317
            for byte_offset, frame_info in sorted(file_map.items()):
1✔
318
                # Seek and decode.
319
                hk_logger.debug('get_data: seeking to %i for %i block extractions' %
1✔
320
                                (byte_offset, len(frame_info)))
321
                reader.Seek(byte_offset)
1✔
322
                frames = reader.Process(None)
1✔
323
                assert(len(frames) == 1)
1✔
324
                frames = self.translator(frames[0])
1✔
325
                frame = frames[0]
1✔
326
                # Now expand all blocks.
327
                for block_index, group_name, offset in frame_info:
1✔
328
                    block = frame['blocks'][block_index]
1✔
329
                    gi = group_info[group_name]
1✔
330
                    if raw:
1✔
331
                        # Short-circuit if in raw mode, just collect all blocks.
332
                        if group_name not in data:
1✔
333
                            data[group_name] = {}
1✔
334
                            for field, f_short in gi['fields']:
1✔
335
                                data[group_name] = []
1✔
336
                        data[group_name].append(block)
1✔
337
                        continue
1✔
338
                    if group_name not in timelines:
1✔
339
                        # This block is init that happens only once per group.
340
                        timelines[group_name] = {
1✔
341
                            't_g3': np.zeros(gi['count'], dtype=np.int64),
342
                            'fields': [f for f,s in gi['fields']],
343
                        }
344
                        hk_logger.debug('get_data: creating group "%s" with %i fields'
1✔
345
                                        % (group_name, len(gi['fields'])))
346
                        # Determine data type of each field and create output arrays.
347
                        for field, f_short in gi['fields']:
1✔
348
                            dtype = np.array(block[f_short]).dtype
1✔
349
                            gi['types'].append(dtype)
1✔
350
                            if dtype.char == 'U':
1✔
351
                                data[field] = []
1✔
352
                            else:
353
                                data[field] = np.empty(gi['count'], dtype)
1✔
354
                            hk_logger.debug('get_data: field "%s" has type %s' % (
1✔
355
                                f_short, dtype))
356
                    # Copy in block data.
357
                    n = len(block.times)
1✔
358
                    # Note this is in G3 time units for now... fixed later.
359
                    timelines[group_name]['t_g3'][offset:offset+n] = block.times
1✔
360
                    for (field, f_short), dtype in zip(gi['fields'], gi['types']):
1✔
361
                        if dtype.char == 'U':
1✔
362
                            data[field].append((offset, list(map(str, block[f_short]))))
1✔
363
                        else:
364
                            # This is a relatively quick copy because
365
                            # of buffer pass-through from G3... don't
366
                            # hit the RHS with np.array!
367
                            data[field][offset:offset+n] = block[f_short]
1✔
368

369
        if raw:
1✔
370
            return [(group_name, _concat_hk_stream(data[group_name]))
1✔
371
                    for group_name, _, _ in grouped]
372

373
        # Finalize string fields.
374
        for group_name, fields, fgrps in grouped:
1✔
375
            gi = group_info[group_name]
1✔
376
            for (field, f_short), dtype in zip(gi['fields'], gi['types']):
1✔
377
                if dtype.char == 'U':
1✔
378
                    data[field] = np.array(list(itertools.chain(
1✔
379
                        *[x for i, x in sorted(data[field])])))
380
                    assert(len(data[field]) == gi['count'])
1✔
381

382
        # Scale out time units.
383
        for timeline in timelines.values():
1✔
384
            timeline['t'] = timeline.pop('t_g3') / core.G3Units.seconds
1✔
385

386
        # Restrict to only the requested time range.
387
        if start is not None or end is not None:
1✔
388
            for timeline in timelines.values():
×
389
                i0, i1 = 0, len(timeline['t'])
×
390
                if start is not None:
×
391
                    i0 = np.searchsorted(timeline['t'], start)
×
392
                if end is not None:
×
393
                    i1 = np.searchsorted(timeline['t'], end)
×
394
                sl = slice(i0, i1)
×
395
                timeline['t'] = timeline['t'][sl]
×
396
                for k in timeline['fields']:
×
397
                    data[k] = data[k][sl]
×
398

399
        # Mark last time
400
        for timeline in timelines.values():
1✔
401
            if len(timeline['t']):
1✔
402
                timeline['finalized_until'] = timeline['t'][-1]
1✔
403
            else:
404
                timeline['finalized_until'] = start if start is not None else 0.
×
405

406
        return (data, timelines)
1✔
407

408
    def simple(self, fields=None, start=None, end=None, min_stride=None,
1✔
409
               raw=False, short_match=True):
410
        """Load data from specified field(s) between specified times, and
411
        unpack the data for ease of use.  Use get_data if you want to
412
        preserve the co-sampling structure.
413

414
        Arguments ``field``, ``start``, ``end``, ``short_match`` are
415
        as described in _get_groups.  However, ``fields`` can be a
416
        single string rather than a list of strings.
417

418
        Note that ``short_match`` defaults to True (which is not the
419
        case for getdata).x
420

421
        Returns:
422
            List of pairs of numpy arrays (t,y) corresponding to each
423
            field in the ``fields`` list.  If ``fields`` is a string,
424
            a simple pair (t,y) is returned.  ``t`` and ``y`` are
425
            numpy arrays of equal length containing the timestamps and
426
            field readings, respectively.  In cases where two fields
427
            are co-sampled, the time vector will be the same object.
428

429
            In cases where there are no data for the requested field
430
            in the time range, a pair of length 0 float arrays is returned.
431

432
        """
433
        unpack = isinstance(fields, str)
1✔
434
        if unpack:
1✔
435
            fields = [fields]
×
436
        data, timelines = self.get_data(fields, start, end, min_stride, raw, short_match)
1✔
437
        output = {}
1✔
438
        fields_not_found = [f for f in fields]
1✔
439
        for timeline in timelines.values():
1✔
440
            # Make the array here, so that the same object is returned
441
            # for all fields in this group.
442
            _t = np.array(timeline['t'])
1✔
443
            for f in timeline['fields']:
1✔
444
                output[f] = (_t, np.array(data[f]))
1✔
445
                fields_not_found.remove(f)
1✔
446
        nothing = np.zeros((0,))
1✔
447
        for f in fields_not_found:
1✔
448
            output[f] = (nothing, nothing)
×
449
        output = [output[f] for f in fields]
1✔
450
        if unpack:
1✔
451
            output = output[0]
×
452
        return output
1✔
453

454

455
class _HKProvider:
1✔
456
    def __init__(self, prov_id, prefix):
1✔
457
        self.prov_id = prov_id
1✔
458
        self.prefix = prefix
1✔
459
        self.blocks = {}
1✔
460

461
    @classmethod
1✔
462
    def from_g3(cls, element):
1✔
463
        prov_id = element['prov_id'].value
1✔
464
        prefix = element['description'].value
1✔
465
        return cls(prov_id, prefix)
1✔
466

467

468
class HKArchiveScanner:
1✔
469
    """Consumes SO Housekeeping streams and creates a record of what fields
470
    cover what time ranges.  This can run as a G3Pipeline module, but
471
    will not be able to record stream indexing information in that
472
    case.  If it's populated through the process_file method, then
473
    index information (in the sense of filenames and byte offsets)
474
    will be stored.
475

476
    After processing frames, calling .finalize() will return an
477
    HKArchive that can be used to load data more efficiently.
478

479
    """
480
    def __init__(self, pre_proc_dir=None, pre_proc_mode=None):
1✔
481
        self.session_id = None
1✔
482
        self.providers = {}
1✔
483
        self.field_groups = []
1✔
484
        self.frame_info = []
1✔
485
        self.counter = -1
1✔
486
        self.translator = so3g.hk.HKTranslator()
1✔
487
        self.pre_proc_dir = pre_proc_dir
1✔
488
        self.pre_proc_mode = pre_proc_mode
1✔
489

490
    def __call__(self, *args, **kw):
1✔
491
        return self.Process(*args, **kw)
×
492

493
    def Process(self, f, index_info=None):
1✔
494
        """Processes a frame.  Only Housekeeping frames will be examined;
495
        other frames will simply be counted.  All frames are passed
496
        through unmodified.  The index_info will be stored along with
497
        a description of the frame's data; see the .process_file
498
        function.
499

500
        """
501
        self.counter += 1
1✔
502
        if index_info is None:
1✔
503
            index_info = {'counter': self.counter}
×
504

505
        f = self.translator(f)
1✔
506
        assert(len(f) == 1)
1✔
507
        f = f[0]
1✔
508

509
        if f.type == core.G3FrameType.EndProcessing:
1✔
510
            return [f]
×
511

512
        if f.type != core.G3FrameType.Housekeeping:
1✔
513
            return [f]
1✔
514

515
        vers = f.get('hkagg_version', 0)
1✔
516
        assert(vers == 2)
1✔
517

518
        if f['hkagg_type'] == so3g.HKFrameType.session:
1✔
519
            session_id = f['session_id']
1✔
520
            if self.session_id is not None:
1✔
521
                if self.session_id != session_id:
×
522
                    self.flush()  # Note that this sets self.session_id=None.
×
523
            if self.session_id is None:
1✔
524
                core.log_info('New HK Session id = %i, timestamp = %i' %
1✔
525
                              (session_id, f['start_time']), unit='HKScanner')
526
                self.session_id = session_id
1✔
527

528
        elif f['hkagg_type'] == so3g.HKFrameType.status:
1✔
529
            # If a provider has disappeared, flush its information into a
530
            # FieldGroup.
531
            prov_cands = [_HKProvider.from_g3(p) for p in f['providers']]
1✔
532
            to_flush = list(self.providers.keys())  # prov_ids...
1✔
533
            for p in prov_cands:
1✔
534
                if p.prov_id in to_flush:
1✔
535
                    to_flush.remove(p.prov_id) # no, don't.
×
536
                else:
537
                    self.providers[p.prov_id] = p
1✔
538
            for prov_id in to_flush:
1✔
539
                self.flush([prov_id])
×
540

541
        elif f['hkagg_type'] == so3g.HKFrameType.data:
1✔
542
            # Data frame -- merge info for this provider.
543
            prov = self.providers[f['prov_id']]
1✔
544
            representatives = prov.blocks.keys()
1✔
545

546
            for bidx, (bname, b) in enumerate(zip(f['block_names'], f['blocks'])):
1✔
547
                assert(isinstance(b, core.G3TimesampleMap))
1✔
548
                if bname not in prov.blocks:
1✔
549
                    prov.blocks[bname] = {'fields': list(b.keys()),
1✔
550
                                          'start': b.times[0].time / core.G3Units.seconds,
551
                                          'index_info': []}
552
                # To ensure that the last sample is actually included
553
                # in the semi-open intervals we use to track frames,
554
                # the "end" time has to be after the final sample.
555
                prov.blocks[bname]['end'] = b.times[-1].time / core.G3Units.seconds + SPAN_BUFFER_SECONDS
1✔
556
                ii = {'block_index': bidx,
1✔
557
                      'time_range': (b.times[0].time / core.G3Units.seconds,
558
                                     b.times[-1].time / core.G3Units.seconds),
559
                      'count': len(b.times)}
560
                ii.update(index_info)
1✔
561
                prov.blocks[bname]['index_info'].append(ii)
1✔
562
                
563
        else:
564
            core.log_warn('Weird hkagg_type: %i' % f['hkagg_type'],
×
565
                          unit='HKScanner')
566
        return [f]
1✔
567

568
    def flush(self, provs=None):
1✔
569
        """Convert any cached provider information into _FieldGroup
570
        information.  Delete the provider information.  This will be
571
        called automatically during frame processing if a provider
572
        session ends.  Once frame processing has completed, this
573
        function should be called, with no arguments, to flush any
574
        remaining provider sessions.
575

576
        Args:
577
            provs (list or None): list of provider identifiers to
578
              flush.  If None, flush all and also reset the
579
              self.session_id.
580

581
        Returns:
582
            None
583

584
        """
585
        if provs is None:
1✔
586
            provs = list(self.providers.keys())
1✔
587
            self.session_id = None
1✔
588
        for p in provs:
1✔
589
            prov = self.providers.pop(p)
1✔
590
            blocks = prov.blocks
1✔
591
            for k, info in blocks.items():
1✔
592
                fg = _FieldGroup(prov.prefix, info['fields'], info['start'],
1✔
593
                                 info['end'], info['index_info'])
594
                self.field_groups.append(fg)
1✔
595

596
    def finalize(self):
1✔
597
        """Finalize the processing by calling flush(), then return an
598
        HKArchive with all the _FieldGroup information from this
599
        scanning session.
600
        """
601
        self.flush()
1✔
602
        return HKArchive(self.field_groups)
1✔
603

604
    def process_file(self, filename, flush_after=True):
1✔
605
        """Process the file specified by ``filename`` using a G3IndexedReader.
606
        Each frame from the file is passed to self.Process, with the
607
        optional index_info argument set to a dictionary containing
608
        the filename and byte_offset of the frame.
609

610
        Internal data grouping will be somewhat cleaner if the
611
        multiple files from a single aggregator "session" are passed
612
        to this function in acquisition order.  In that case, call
613
        with flush_after=False.
614

615
        """
616
        reader = so3g.G3IndexedReader(filename)
1✔
617
        while True:
618
            info = {'filename': filename,
1✔
619
                    'byte_offset': reader.Tell()}
620
            frames = reader.Process(None)
1✔
621
            assert len(frames) <= 1
1✔
622
            if len(frames) == 0:
1✔
623
                break
1✔
624
            self.Process(frames[0], info)
1✔
625
        # Calling flush() here protects us against the odd case that
626
        # we process files from a single session in non-consecutive
627
        # order.  In that case the start' and 'end' times will get
628
        # messed up because we can't tell the stream has been
629
        # re-initialized.
630
        if flush_after:
1✔
631
            self.flush()
1✔
632

633

634
    def process_file_with_cache(self, filename):
1✔
635
        """Processes file specified by ``filename`` using the process_file
636
           method above. If self.pre_proc_dir is specified (not None), it
637
           will load pickled HKArchiveScanner objects and concatenates with
638
           self instead of re-processing each frame, if the corresponding
639
           file exists.  If the pkl file does not exist, it processes it and
640
           saves the result (in the pre_proc_dir) so it can be used in the
641
           future.  If self.pre_proc_dir is not specified, this becomes
642
           equivalent to process_file.
643
        """
644
        if self.pre_proc_dir is None:
1✔
645
            self.process_file(filename)
1✔
646
            return
1✔
647

648
        folder = os.path.basename(filename)[:5]
×
649
        path = os.path.join( self.pre_proc_dir, folder, os.path.basename(filename).replace(".g3",'.pkl') )
×
650

651
        if os.path.exists(path):
×
652
            with open(path, 'rb') as pkfl:
×
653
                hksc = pickle.load(pkfl)
×
654

655
        else:
656
            hksc = HKArchiveScanner()
×
657
            hksc.process_file(filename)
×
658
            # Make dirs if needed
659
            if not os.path.exists( os.path.dirname(path) ):
×
660
                os.makedirs( os.path.dirname(path) )
×
661
                if self.pre_proc_mode is not None:
×
662
                    os.chmod( os.path.dirname(path), self.pre_proc_mode )
×
663
            # Save pkl file
664
            with open(path, 'wb') as pkfl:
×
665
                pickle.dump(hksc, pkfl)
×
666
            if self.pre_proc_mode is not None:
×
667
                os.chmod( path, self.pre_proc_mode )            
×
668

669
        self.field_groups += hksc.field_groups
×
670
        self.counter += hksc.counter
×
671

672

673

674

675
class _FieldGroup:
1✔
676
    """Container object for look-up information associated with a group of
677
    fields that share a timeline (i.e. a group of fields that are
678
    co-sampled over some requested time range).
679

680
    Attributes:
681
        prefix (str): Not used.
682
        fields (list of str): The field names.
683
        cover (IntervalsDouble): The time range (as unix timestamps)
684
          over which these fields have data.  This is expressed as an
685
          IntervalsDouble to enable the use of Intervals arithmetic.
686
          The range is created from the "start" and "end" arguments of
687
          the constructor.
688
        index_info (list of dict): Information that the consumer will
689
          use to locate and load the data efficiently.  The entries in
690
          the list represent time-ordered frames. See Notes.
691

692
    Notes:
693

694
      Each entry of index_info is a dict providing information about a
695
      single frame and block where data can be found.  The fields are:
696

697
      - 'filename' (str): The file in which the frame is located.
698
      - 'byte_offset' (int): The offset within the file at which to
699
        find the frame.
700
      - 'block_index' (int): The index of the block, within the
701
        corresponding frame, where the data are found.
702
      - 'count' (int): the number of samples in this block.
703
      - 'time_range' (tuple): (first time, last time), as unix
704
        timestamps.  Note the last time is the time of the last
705
        sample, not some time beyond that.
706
      - 'counter' (int): An index providing the order in which the
707
        frames were scanned.
708

709
    """
710
    def __init__(self, prefix, fields, start, end, index_info):
1✔
711
        self.prefix = prefix
1✔
712
        self.fields = list(fields)
1✔
713
        self.cover = so3g.IntervalsDouble().add_interval(start, end)
1✔
714
        self.index_info = index_info
1✔
715
    def __repr__(self):
1✔
716
        try:
×
717
            return '_FieldGroup("%s", %i fields, %i segments)' % (
×
718
                self.prefix, len(self.fields), len(self.index_info))
719
        except:
×
720
            return '_FieldGroup(<bad internal state!>)'
×
721

722

723
def to_timestamp(some_time, str_format=None): 
1✔
724
    """Convert the argument to a unix timestamp.
725

726
    Args:
727
      some_time: If a datetime, it is converted to UTC timezone and
728
        then to a unix timestamp.  If int or float, the value is
729
        returned unprocessed.  If str, a date will be extracted based
730
        on a few trial date format strings.
731
      str_format: a format string (for strptime) to try, instead of
732
        the default(s).
733

734
    Returns:
735
        float: Unix timestamp corresponding to some_time.
736

737
    """
738
    
739
    if type(some_time) == dt.datetime:
1✔
740
        return some_time.astimezone(dt.timezone.utc).timestamp()
×
741
    if type(some_time) == int or type(some_time) == float:
1✔
742
        return some_time
1✔
743
    if type(some_time) == str:
×
744
        if str_format is not None:
×
745
            return to_timestamp(pytz.utc.localize( dt.datetime.strptime(some_time, str_format )))
×
746
        str_options = ['%Y-%m-%d', '%Y-%m-%d %H:%M', '%Y-%m-%d %H:%M:%S', '%Y-%m-%d %H:%M:%S.%f']
×
747
        for option in str_options:
×
748
            try:
×
749
                return to_timestamp(pytz.utc.localize( dt.datetime.strptime(some_time, option )))
×
750
            except:
×
751
                continue
×
752
        raise ValueError('Could not process string into date object, options are: {}'.format(str_options))
×
753
        
754
    raise ValueError('Type of date / time indication is invalid, accepts datetime, int, float, and string')
×
755

756
def load_range(start, stop, fields=None, alias=None, 
1✔
757
               data_dir=None, config=None, pre_proc_dir=None, pre_proc_mode=None,
758
               strict=True):
759
    """Args:
760

761
      start: Earliest time to search for data (see note on time
762
        formats).
763
      stop: Latest time to search for data (see note on time formats).
764
      fields: Fields to return, if None, returns all fields.
765
      alias: If not None, must be a list of strings providing exactly
766
        one value for each entry in fields.
767
      data_dir: directory where all the ctime folders are.  If None,
768
        tries to use $OCS_DATA_DIR.
769
      config: filename of a .yaml file for loading data_dir / fields /
770
        alias
771
      pre_proc_dir: Place to store pickled HKArchiveScanners for g3
772
        files to speed up loading
773
      pre_proc_mode: Permissions (passed to os.chmod) to be used on
774
        dirs and pkl files in the pre_proc_dir. No chmod if None.
775
      strict: If False, log and skip missing fields rather than
776
        raising a KeyError.
777
                
778
    Returns:
779

780
      Dictionary with structure::
781

782
        {
783
            alias[i] : (time[i], data[i])
784
        }
785

786
      It will be masked to only have data between start and stop.
787
        
788
    Notes:
789

790
      The "start" and "stop" argument accept a variety of formats,
791
      including datetime objects, unix timestamps, and strings (see
792
      to_timestamp function).  In the case of datetime objects, you
793
      should set tzinfo=dt.timezone.utc explicitly if the system is
794
      not set to UTC time.
795

796
      Example usage::
797

798
        fields = [
799
            'observatory.HAN.feeds.temperatures.Channel 1 T',
800
            'observatory.HAN.feeds.temperatures.Channel 2 T',
801
        ]
802

803
        alias = [
804
            'HAN 1', 'HAN 2',
805
        ]
806

807
        start = dt.datetime(2020,2,19,18,48)
808
        stop = dt.datetime(2020,2,22)
809
        data = load_range(start, stop, fields=fields, alias=alias)
810

811
        plt.figure()
812
        for name in alias:
813
            plt.plot( data[name][0], data[name][1])
814

815
    """
816
    if config is not None:
×
817
        if not (data_dir is None and fields is None and alias is None):
×
818
            hk_logger.warning('''load_range has a config file - data_dir, fields, and alias are ignored''')
×
819
        with open(config, 'r') as f:
×
820
            setup = yaml.load(f, Loader=yaml.FullLoader)
×
821
        
822
        if 'data_dir' not in setup.keys():
×
823
            raise ValueError('load_range config file requires data_dir entry')
×
824
        data_dir = setup['data_dir']
×
825
        if 'field_list' not in setup.keys():
×
826
            raise ValueError('load_range config file requires field_list entry')
×
827
        fields = []
×
828
        alias = []
×
829
        for k in setup['field_list']:
×
830
            fields.append( setup['field_list'][k])
×
831
            alias.append( k )
×
832
            
833
    if data_dir is None and 'OCS_DATA_DIR' not in os.environ.keys():
×
834
        raise ValueError('if $OCS_DATA_DIR is not defined a data directory must be passed to getdata')
×
835
    if data_dir is None:
×
836
        data_dir = os.environ['OCS_DATA_DIR']
×
837

838
    hk_logger.debug('Loading data from {}'.format(data_dir))
×
839
    
840
    start_ctime = to_timestamp(start) - 3600
×
841
    stop_ctime = to_timestamp(stop) + 3600
×
842

843
    hksc = HKArchiveScanner(pre_proc_dir=pre_proc_dir)
×
844
    
845
    for folder in range( int(start_ctime/1e5), int(stop_ctime/1e5)+1):
×
846
        base = data_dir+'/'+str(folder)
×
847
        if not os.path.exists(base):
×
848
            hk_logger.debug('{} does not exist, skipping'.format(base))
×
849
            continue
×
850
    
851
        for file in sorted(os.listdir(base)):
×
852
            try:
×
853
                t = int(file[:-3])
×
854
            except:
×
855
                hk_logger.debug('{} does not have the right format, skipping'.format(file))
×
856
                continue
×
857
            if t >= start_ctime-3600 and t <=stop_ctime+3600:
×
858
                hk_logger.debug('Processing {}'.format(base+'/'+file))
×
859
                hksc.process_file_with_cache( base+'/'+file)
×
860
    
861
    
862
    cat = hksc.finalize()
×
863
    start_ctime = to_timestamp(start)
×
864
    stop_ctime = to_timestamp(stop)
×
865
    
866
    all_fields,_ = cat.get_fields()
×
867
    
868
    if fields is None:
×
869
        fields = all_fields
×
870
    if alias is not None:
×
871
        if len(alias) != len(fields):
×
872
            hk_logger.error('if provided, alias needs to be the length of fields')
×
873
    else:
874
        alias = fields
×
875
    
876
    # Single pass load.
877
    keepers = []
×
878
    for name, field in zip(alias, fields):
×
879
        if field not in all_fields:
×
880
            hk_logger.debug('`{}` not in available fields, skipping'.format(field))
×
881
            continue
×
882
        keepers.append((name, field))
×
883
    data = cat.simple([f for n, f in keepers],
×
884
                      start=start_ctime, end=stop_ctime)
885
    data = {name: data[i] for i, (name, field) in enumerate(keepers)}
×
886

887
    return data
×
888

889
if __name__ == '__main__':
1✔
890
    import argparse
×
891
    parser = argparse.ArgumentParser(
×
892
        usage='This demo can be used to plot data from SO HK files.')
893
    parser.add_argument('--field')
×
894
    parser.add_argument('files', nargs='+', help=
×
895
                        "SO Housekeeping files to load.")
896
    args = parser.parse_args()
×
897

898
    # Scan the file set.
899
    hkas = HKArchiveScanner()
×
900
    for filename in args.files:
×
901
        print(f'Scanning {filename}...')
×
902
        hkas.process_file(filename)
×
903

904
    print(' ... finalizing.')
×
905
    cat = hkas.finalize()
×
906

907
    # Get list of fields, timelines, spanning all times:
908
    fields, timelines = cat.get_fields()
×
909

910
    # Show
911
    for field_name in sorted(fields):
×
912
        group_name = fields[field_name]['timeline']
×
913
        print(field_name, timelines[group_name]['interval'])
×
914

915
    if args.field is None:
×
916
        full_name = list(fields.keys())[0]
×
917
        print('Pretending interest in:', full_name)
×
918
    else:
919
        full_name = args.field
×
920
        print('User requests:', full_name)
×
921

922
    # Identify the shortest form of this field that also works.
923
    f_toks = full_name.split('.')
×
924
    field_name = full_name
×
925
    for i in range(1, 2**len(f_toks)):
×
926
        short_name = '.'.join([t for b,t in enumerate(f_toks) if (i >> b) & 1])
×
927
        try:
×
928
            fields, timelines = cat.get_data([short_name], short_match=True)
×
929
        except Exception:
×
930
            continue
×
931
        if len(short_name) < len(field_name):
×
932
            field_name = short_name
×
933
            print(field_name)
×
934

935
    print('Name shortened to:', field_name)
×
936

937
    # This is the awkward way, which preserves co-sampled
938
    # relationships (and is thus annoying to decode in simple cases).
939
    fields, timelines = cat.get_data([field_name], short_match=True)
×
940
    x0 = list(timelines.values())[0]['t']
×
941
    y0 = fields[field_name]
×
942

943
    # This is the easy way, which just gives you one timeline per
944
    # requested field.
945
    x1, y1 = cat.simple(field_name)
×
946
    
947
    assert np.all(np.array(x0) == x1) and np.all(np.array(y0) == y1)
×
948

949
    import pylab as pl
×
950
    pl.plot(x1, y1)
×
951
    pl.title(field_name)
×
952
    pl.show()
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc