• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

desihub / desiutil / 6041958714

31 Aug 2023 08:08PM UTC coverage: 74.322% (-0.07%) from 74.392%
6041958714

Pull #199

github-actions

weaverba137
working on PR suggestions
Pull Request #199: Add units to a Table

152 of 152 new or added lines in 1 file covered. (100.0%)

2191 of 2948 relevant lines covered (74.32%)

0.74 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.03
/py/desiutil/annotate.py
1
# Licensed under a 3-clause BSD style license - see LICENSE.rst
2
# -*- coding: utf-8 -*-
3
"""
1✔
4
=================
5
desiutil.annotate
6
=================
7

8
Tools for adding units and comments to FITS files.
9
"""
10
import csv
1✔
11
import os
1✔
12
import sys
1✔
13
from argparse import ArgumentParser
1✔
14
import yaml
1✔
15
from astropy.io import fits
1✔
16
from astropy.table import Table, QTable
1✔
17
from astropy.units import UnitConversionError
1✔
18
from . import __version__ as desiutilVersion
1✔
19
from .log import get_logger, DEBUG
1✔
20

21

22
log = None
1✔
23

24

25
def find_column_name(columns, prefix=('unit', )):
1✔
26
    """Find the column that contains unit descriptions, or comments.
27

28
    Parameters
29
    ----------
30
    columns : iterable
31
        The column names from a CSV or similar file.
32
    prefix : :class:`tuple`, optional
33
        Search for a column matching one or more items in `prefix`.
34
        The search is case-insensitive.
35

36
    Returns
37
    -------
38
    :class:`int`
39
        The index of `columns` that matches.
40

41
    Raises
42
    ------
43
    IndexError
44
        If no match was found.
45
    """
46
    for i, column in enumerate(columns):
1✔
47
        for s in prefix:
1✔
48
            if s in column.lower():
1✔
49
                return i
1✔
50
    raise IndexError(f"No column matching '{prefix[0]}' found!")
1✔
51

52

53
def find_key_name(data, prefix=('unit', )):
1✔
54
    """
55
    Parameters
56
    ----------
57
    data : :class:`dict`
58
        A dictionary resulting from a parsed YAML file.
59
    prefix : :class:`tuple`, optional
60
        Search for a column matching one or more items in `prefix`.
61
        The search is case-insensitive.
62

63
    Returns
64
    -------
65
    :class:`str`
66
        The key of `data` that matches.
67

68
    Raises
69
    ------
70
    KeyError
71
        If no match was found.
72
    """
73
    for key in data.keys():
1✔
74
        for s in prefix:
1✔
75
            if s in key.lower():
1✔
76
                return key
1✔
77
    raise KeyError(f"No key matching '{prefix[0]}' found!")
1✔
78

79

80
def load_csv_units(filename):
1✔
81
    """Parse a CSV file that contains column names and units and optionally comments.
82

83
    Table column names are assumed to be in the first column of the CSV file.
84
    Any column with the name "Unit(s)" (case-insensitive) is assumed to contain FITS-style units.
85
    Any column with the name "Comment(s)" (case-insensitive) or "Description(s)" is assumed to be the comment.
86

87
    Parameters
88
    ----------
89
    filename : :class:`str` or :class:`pathlib.Path`
90
        Read column definitions from `filename`.
91

92
    Returns
93
    -------
94
    :class:`tuple`
95
        A tuple containing two :class:`dict` objects for units and comments.
96
        If no comments are detected, the comments :class:`dict` will be empty.
97

98
    Raises
99
    ------
100
    ValueError
101
        If `filename` does not at least contain a "unit" column.
102
    """
103
    units = dict()
1✔
104
    comments = dict()
1✔
105
    header = None
1✔
106
    data = list()
1✔
107
    log.debug("filename = '%s'", filename)
1✔
108
    with open(filename, newline='') as f:
1✔
109
        reader = csv.reader(f)
1✔
110
        for row in reader:
1✔
111
            if header is None:
1✔
112
                header = row
1✔
113
            else:
114
                data.append(row)
1✔
115
    log.debug(header)
1✔
116
    try:
1✔
117
        u = find_column_name(header)
1✔
118
    except IndexError:
1✔
119
        raise ValueError(f"{filename} does not have a unit column!")
1✔
120
    try:
1✔
121
        c = find_column_name(header, prefix=('comment', 'description'))
1✔
122
    except IndexError:
1✔
123
        c = None
1✔
124
    for row in data:
1✔
125
        log.debug("units['%s'] = '%s'", row[0], row[u])
1✔
126
        units[row[0]] = row[u]
1✔
127
        if c:
1✔
128
            log.debug("comments['%s'] = '%s'", row[0], row[c])
1✔
129
            comments[row[0]] = row[c]
1✔
130
    return (units, comments)
1✔
131

132

133
def load_yml_units(filename):
1✔
134
    """Parse a YAML file that contains column names and units and optionally comments.
135

136
    The YAML file should contain a dictionary with a keyword like 'units' and,
137
    optionally, a keyword like 'comments' or 'description.
138

139
    For backwards-compatibility, the YAML file can be simply a dictionary
140
    containing column names.
141

142
    Parameters
143
    ----------
144
    filename : :class:`str` or :class:`pathlib.Path`
145
        Read column definitions from `filename`.
146

147
    Returns
148
    -------
149
    :class:`tuple`
150
        A tuple containing two :class:`dict` objects for units and comments.
151
        If no comments are detected, the comments :class:`dict` will be empty.
152
    """
153
    comments = dict()
1✔
154
    # log.debug("y = yaml.safe_load('%s')", filename)
155
    with open(filename, newline='') as f:
1✔
156
        y = yaml.safe_load(f)
1✔
157
    try:
1✔
158
        u = find_key_name(y)
1✔
159
    except KeyError:
1✔
160
        log.warning(f"{filename} does not have a unit column, assuming keys are columns!")
1✔
161
        u = None
1✔
162
    try:
1✔
163
        c = find_key_name(y, prefix=('comment', 'description'))
1✔
164
    except KeyError:
1✔
165
        c = None
1✔
166
    if u:
1✔
167
        units = y[u]
1✔
168
    else:
169
        units = y
1✔
170
    if c:
1✔
171
        comments = y[c]
1✔
172
    return (units, comments)
1✔
173

174

175
def annotate_table(table, units, inplace=False):
1✔
176
    """Add annotations to `table`.
177

178
    Parameters
179
    ----------
180
    table : :class:`astropy.table.Table` or :class:`astropy.table.QTable`
181
        A data table.
182
    units : :class:`dict`
183
        Mapping of table columns to units.
184
    inplace : :class:`bool`, optional
185
        If ``True``, modify `table` directly instead of returning a copy.
186

187
    Returns
188
    -------
189
    :class:`astropy.table.Table`
190
        An updated version of `table`.
191

192
    Notes
193
    -----
194
    Currently :class:`~astropy.table.Table` does not support the concept
195
    of column-specific comments, especially in a way where the comments
196
    could be written to a file. If this ever changes, this function could
197
    be extended to add comments.
198
    """
199
    if inplace:
1✔
200
        t = table
1✔
201
    else:
202
        if isinstance(table, QTable):
1✔
203
            t = QTable(table)  # copy=True is the default.
1✔
204
        else:
205
            t = Table(table)
1✔
206
    for colname in t.colnames:
1✔
207
        if colname not in units:
1✔
208
            log.info("Column '%s' not found in units argument.", colname)
1✔
209
    for column in units:
1✔
210
        if column in t.colnames:
1✔
211
            if len(units[column]) > 0:
1✔
212
                try:
1✔
213
                    log.debug("t['%s'].unit = '%s'", column, units[column])
1✔
214
                    t[column].unit = units[column]
1✔
215
                except AttributeError:
1✔
216
                    #
217
                    # Can't change .unit if it is already set. Try to convert.
218
                    #
219
                    try:
1✔
220
                        log.debug("t.replace_column('%s', t['%s'].to('%s'))", column, column, units[column])
1✔
221
                        t.replace_column(column, t[column].to(units[column]))
1✔
222
                    except UnitConversionError:
1✔
223
                        log.error("Cannot add or replace unit '%s' to column '%s'!", units[column], column)
1✔
224
            else:
225
                log.debug("Not setting blank unit for column '%s'.", column)
1✔
226
        else:
227
            log.debug("Column '%s' not present in table.", column)
1✔
228
    return t
1✔
229

230

231
def annotate(filename, extension, units=None, comments=None):
1✔
232
    """Add annotations to `filename`.
233

234
    If `units` or `comments` is an empty dictionary, it will be ignored.
235

236
    Parameters
237
    ----------
238
    filename : :class:`str`
239
        Name of FITS file.
240
    extension : :class:`str` or :class:`int
241
        Name or number of extension in `filename`.
242
    units : :class:`dict`, optional
243
        Mapping of table columns to units.
244
    comments : :class:`dict`, optional
245
        Mapping of table columns to comments.
246

247
    Returns
248
    -------
249
    :class:`astropy.io.fits.HDUList`
250
        An updated version of the file.
251
    """
252
    new_hdus = list()
×
253
    with fits.open(filename, mode='readonly', memmap=False, lazy_load_hdus=False, uint=False, disable_image_compression=True, do_not_scale_image_data=True, character_as_bytes=True, scale_back=True) as hdulist:
×
254
        log.debug(hdulist._open_kwargs)
×
255
        kwargs = hdulist._open_kwargs.copy()
×
256
        for h in hdulist:
×
257
            hc = h.copy()
×
258
            if hasattr(h, '_do_not_scale_image_data'):
×
259
                hc._do_not_scale_image_data = h._do_not_scale_image_data
×
260
            if hasattr(h, '_bzero'):
×
261
                hc._bzero = h._bzero
×
262
            if hasattr(h, '_bscale'):
×
263
                hc._bzero = h._bscale
×
264
            if hasattr(h, '_scale_back'):
×
265
                hc._scale_back = h._scale_back
×
266
            if hasattr(h, '_uint'):
×
267
                hc._uint = h._uint
×
268
            #
269
            # Work around header comments not copied for BinTableHDU.
270
            #
271
            if isinstance(h, fits.BinTableHDU):
×
272
                for key in h.header.keys():
×
273
                    hc.header.comments[key] = h.header.comments[key]
×
274
            #
275
            # Work around disappearing BZERO and BSCALE keywords.
276
            #
277
            if isinstance(h, fits.ImageHDU) and 'BZERO' in h.header and 'BSCALE' in h.header:
×
278
                if 'BZERO' not in hc.header or 'BSCALE' not in hc.header:
×
279
                    iscale = h.header.index('BSCALE')
×
280
                    izero = h.header.index('BZERO')
×
281
                    if izero > iscale:
×
282
                        hc.header.insert(iscale - 1, ('BSCALE', h.header['BSCALE'], h.header.comments['BSCALE']), after=True)
×
283
                        hc.header.insert(iscale, ('BZERO', h.header['BZERO'], h.header.comments['BZERO']), after=True)
×
284
                    else:
285
                        hc.header.insert(izero - 1, ('BZERO', h.header['BZERO'], h.header.comments['BZERO']), after=True)
×
286
                        hc.header.insert(izero, ('BSCALE', h.header['BSCALE'], h.header.comments['BSCALE']), after=True)
×
287
            new_hdus.append(hc)
×
288
    new_hdulist = fits.HDUList(new_hdus)
×
289
    new_hdulist._open_kwargs = kwargs
×
290
    log.debug(new_hdulist._open_kwargs)
×
291
    try:
×
292
        ext = int(extension)
×
293
    except ValueError:
×
294
        ext = extension
×
295
    try:
×
296
        hdu = new_hdulist[ext]
×
297
    except (IndexError, KeyError):
×
298
        raise
×
299
    return new_hdulist
×
300

301

302
def _options():
1✔
303
    """Parse command-line options.
304
    """
305
    parser = ArgumentParser(description="Add units or comments to a FITS file.",
1✔
306
                            prog=os.path.basename(sys.argv[0]))
307
    parser.add_argument('-c', '--comments', action='store', dest='comments', metavar='COMMENTS',
1✔
308
                        help="COMMENTS should have the form COLUMN='comment':COLUMN='comment'.")
309
    parser.add_argument('-C', '--csv', action='store', dest='csv', metavar='CSV',
1✔
310
                        help="Read annotations from CSV file.")
311
    parser.add_argument('-e', '--extension', dest='extension', action='store', metavar='EXT', default='1',
1✔
312
                        help="Update FITS extension EXT, which can be a number or an EXTNAME. If not specified, HDU 1 will be updated, which is standard for simple binary tables.")
313
    parser.add_argument('-o', '--overwrite', dest='overwrite', action='store_true',
1✔
314
                        help='Overwrite the input FITS file.')
315
    parser.add_argument('-t', '--test', dest='test', action='store_true',
1✔
316
                        help='Test mode; show what would be done but do not change any files.')
317
    parser.add_argument('-u', '--units', action='store', dest='units', metavar='UNITS',
1✔
318
                        help="UNITS should have the form COLUMN='unit':COLUMN='unit'.")
319
    parser.add_argument('-v', '--verbose', dest='verbose', action='store_true',
1✔
320
                        help='Print extra debugging information.')
321
    parser.add_argument('-V', '--version', action='version', version='%(prog)s ' + desiutilVersion)
1✔
322
    parser.add_argument('-Y', '--yaml', action='store', dest='yaml', metavar='YAML',
1✔
323
                        help="Read annotations from YAML file.")
324
    parser.add_argument('fits', metavar='INPUT_FITS', help='FITS file to modify.')
1✔
325
    parser.add_argument('output', metavar='OUTPUT_FITS', nargs='?',
1✔
326
                        help='Write to new FITS file. If --overwrite is specified, this value is ignored.')
327
    options = parser.parse_args()
1✔
328
    return options
1✔
329

330

331
def main():
332
    """Entry-point for command-line scripts.
333

334
    Returns
335
    -------
336
    :class:`int`
337
        An integer suitable for passing to :func:`sys.exit`.
338
    """
339
    global log
340
    options = _options()
341
    if options.test or options.verbose:
342
        log = get_logger(DEBUG)
343
    else:
344
        log = get_logger()
345
    if options.csv:
346
        units, comments = load_csv_units(options.csv)
347
    elif options.yaml:
348
        units, comments = load_yml_units(options.yaml)
349
    else:
350
        if options.units:
351
            units = dict(tuple(c.split('=')) for c in options.units.split(':'))
352
        else:
353
            log.warning("No units have been specified!")
354
            units = dict()
355
        if options.comments:
356
            comments = dict(tuple(c.split('=')) for c in options.comments.split(':'))
357
        else:
358
            log.debug("No comments have been specified.")
359
            comments = dict()
360
    log.debug("units = %s", units)
361
    log.debug("comments = %s", comments)
362
    hdulist = annotate(options.fits, options.extension, units, comments)
363
    if options.overwrite and options.output:
364
        output = options.output
365
    elif options.overwrite:
366
        output = options.fits
367
    elif options.output:
368
        output = options.output
369
    else:
370
        log.error("--overwrite not specified and no output file specified!")
371
        return 1
372
    try:
373
        hdulist.writeto(output, output_verify='warn', overwrite=options.overwrite, checksum=False)
374
    except OSError as e:
375
        if 'overwrite' in e.args[0]:
376
            log.error("Output file exists and --overwrite was not specified!")
377
        else:
378
            log.error(e.args[0])
379
        return 1
380
    return 0
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc