• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

desihub / desiutil / 6279824009

22 Sep 2023 10:48PM UTC coverage: 75.948% (+1.6%) from 74.314%
6279824009

Pull #201

github-actions

Stephen Bailey
fix tests
Pull Request #201: Migrate unit validation to desiutil; support adding units to FITS files.

70 of 70 new or added lines in 1 file covered. (100.0%)

2264 of 2981 relevant lines covered (75.95%)

0.76 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

99.46
/py/desiutil/annotate.py
1
# Licensed under a 3-clause BSD style license - see LICENSE.rst
2
# -*- coding: utf-8 -*-
3
"""
1✔
4
=================
5
desiutil.annotate
6
=================
7

8
Tools for adding units and comments to FITS files.
9
"""
10
import csv
1✔
11
import os
1✔
12
import sys
1✔
13
from argparse import ArgumentParser
1✔
14
from warnings import warn
1✔
15
import yaml
1✔
16
from astropy.io import fits
1✔
17
from astropy.table import Table, QTable
1✔
18
from astropy.units import Unit, UnitConversionError
1✔
19
from . import __version__ as desiutilVersion
1✔
20
from .log import get_logger, DEBUG
1✔
21

22

23
class FITSUnitWarning(UserWarning):
1✔
24
    """Warnings related to invalid FITS units.
25
    """
26
    pass
1✔
27

28

29
def find_column_name(columns, prefix=('unit', )):
1✔
30
    """Find the column that contains unit descriptions, or comments.
31

32
    Parameters
33
    ----------
34
    columns : iterable
35
        The column names from a CSV or similar file.
36
    prefix : :class:`tuple`, optional
37
        Search for a column matching one or more items in `prefix`.
38
        The search is case-insensitive.
39

40
    Returns
41
    -------
42
    :class:`int`
43
        The index of `columns` that matches.
44

45
    Raises
46
    ------
47
    IndexError
48
        If no match was found.
49
    """
50
    for i, column in enumerate(columns):
1✔
51
        for s in prefix:
1✔
52
            if s in column.lower():
1✔
53
                return i
1✔
54
    raise IndexError(f"No column matching '{prefix[0]}' found!")
1✔
55

56

57
def find_key_name(data, prefix=('unit', )):
1✔
58
    """
59
    Parameters
60
    ----------
61
    data : :class:`dict`
62
        A dictionary resulting from a parsed YAML file.
63
    prefix : :class:`tuple`, optional
64
        Search for a column matching one or more items in `prefix`.
65
        The search is case-insensitive.
66

67
    Returns
68
    -------
69
    :class:`str`
70
        The key of `data` that matches.
71

72
    Raises
73
    ------
74
    KeyError
75
        If no match was found.
76
    """
77
    for key in data.keys():
1✔
78
        for s in prefix:
1✔
79
            if s in key.lower():
1✔
80
                return key
1✔
81
    raise KeyError(f"No key matching '{prefix[0]}' found!")
1✔
82

83

84
def validate_unit(unit, error=False):
1✔
85
    """Check units for consistency with FITS standard, while allowing
86
    some special exceptions.
87

88
    Parameters
89
    ----------
90
    unit : :class:`str`
91
        The unit to parse.
92
    error : :class:`bool`, optional
93
        If ``True``, failure to interpret the unit raises an
94
        exception.
95

96
    Returns
97
    -------
98
    :class:`str`
99
        If a special exception is detected, the name of the unit
100
        is returned.  Otherwise, ``None``.
101

102
    Raises
103
    ------
104
    :exc:`ValueError`
105
        If `error` is set and `unit` can't be parsed.
106
    """
107
    if unit is None:
1✔
108
        return None
1✔
109
    acceptable_units = ('maggie', 'maggy', 'mgy',
1✔
110
                        'electron/Angstrom',
111
                        'log(Angstrom)')
112
    try:
1✔
113
        au = Unit(unit, format='fits')
1✔
114
    except ValueError as e:
1✔
115
        m = str(e)
1✔
116
        bad_unit = m.split()[0]
1✔
117
        if any([u in bad_unit for u in acceptable_units]) and 'Numeric factor' not in m:
1✔
118
            return bad_unit
1✔
119
        else:
120
            if error:
1✔
121
                raise
1✔
122
            else:
123
                warn(m, FITSUnitWarning)
1✔
124
    return None
1✔
125

126

127
def check_comment_length(comments, error=True):
1✔
128
    """Ensure keyword comments are short enough that they will not be truncated.
129

130
    By experiment, :mod:`astropy.io.fits` will truncate comments longer than
131
    46 characters, however the warning it emits when it does so is not very
132
    informative.
133

134
    Parameters
135
    ----------
136
    comments : :class:`dict`
137
        Mapping of table columns to comments.
138
    error : :class:`bool`, optional
139
        If ``False`` just warn about long comments instead of raising an exception.
140

141
    Returns
142
    -------
143
    :class:`int`
144
        The number of long comments detected, although the value is only relevant
145
        if `error` is ``False``.
146

147
    Raises
148
    ------
149
    ValueError
150
        If any comment is too long.
151
    """
152
    log = get_logger()
1✔
153
    too_long = 46
1✔
154
    n_long = 0
1✔
155
    for key in comments:
1✔
156
        if len(comments[key]) > too_long:
1✔
157
            n_long += 1
1✔
158
            if error:
1✔
159
                log.error("'%s' comment too long: '%s'!", key, comments[key])
1✔
160
            else:
161
                log.warning("Long comment detected for '%s', will be truncated to '%s'!",
1✔
162
                            key, comments[key][:too_long])
163
    if n_long > 0 and error:
1✔
164
        raise ValueError(f"{n_long:d} long comments detected!")
1✔
165
    return n_long
1✔
166

167

168
def load_csv_units(filename):
1✔
169
    """Parse a CSV file that contains column names and units and optionally comments.
170

171
    Table column names are assumed to be in the first column of the CSV file.
172
    Any column with the name "Unit(s)" (case-insensitive) is assumed to contain FITS-style units.
173
    Any column with the name "Comment(s)" (case-insensitive) or "Description(s)" is assumed to be the comment.
174

175
    Parameters
176
    ----------
177
    filename : :class:`str` or :class:`pathlib.Path`
178
        Read column definitions from `filename`.
179

180
    Returns
181
    -------
182
    :class:`tuple`
183
        A tuple containing two :class:`dict` objects for units and comments.
184
        If no comments are detected, the comments :class:`dict` will be empty.
185

186
    Raises
187
    ------
188
    ValueError
189
        If `filename` does not at least contain a "unit" column.
190
    """
191
    log = get_logger()
1✔
192
    units = dict()
1✔
193
    comments = dict()
1✔
194
    header = None
1✔
195
    data = list()
1✔
196
    log.debug("filename = '%s'", filename)
1✔
197
    with open(filename, newline='') as f:
1✔
198
        reader = csv.reader(f)
1✔
199
        for row in reader:
1✔
200
            if header is None:
1✔
201
                header = row
1✔
202
            else:
203
                data.append(row)
1✔
204
    log.debug(header)
1✔
205
    try:
1✔
206
        u = find_column_name(header)
1✔
207
    except IndexError:
1✔
208
        raise ValueError(f"{filename} does not have a unit column!")
1✔
209
    try:
1✔
210
        c = find_column_name(header, prefix=('comment', 'description'))
1✔
211
    except IndexError:
1✔
212
        c = None
1✔
213
    for row in data:
1✔
214
        log.debug("units['%s'] = '%s'", row[0], row[u])
1✔
215
        units[row[0]] = row[u]
1✔
216
        if c:
1✔
217
            log.debug("comments['%s'] = '%s'", row[0], row[c])
1✔
218
            comments[row[0]] = row[c]
1✔
219
    return (units, comments)
1✔
220

221

222
def load_yml_units(filename):
1✔
223
    """Parse a YAML file that contains column names and units and optionally comments.
224

225
    The YAML file should contain a dictionary with a keyword like 'units' and,
226
    optionally, a keyword like 'comments' or 'description.
227

228
    For backwards-compatibility, the YAML file can be simply a dictionary
229
    containing column names.
230

231
    Parameters
232
    ----------
233
    filename : :class:`str` or :class:`pathlib.Path`
234
        Read column definitions from `filename`.
235

236
    Returns
237
    -------
238
    :class:`tuple`
239
        A tuple containing two :class:`dict` objects for units and comments.
240
        If no comments are detected, the comments :class:`dict` will be empty.
241
    """
242
    log = get_logger()
1✔
243
    comments = dict()
1✔
244
    # log.debug("y = yaml.safe_load('%s')", filename)
245
    with open(filename, newline='') as f:
1✔
246
        y = yaml.safe_load(f)
1✔
247
    try:
1✔
248
        u = find_key_name(y)
1✔
249
    except KeyError:
1✔
250
        log.warning(f"{filename} does not have a unit column, assuming keys are columns!")
1✔
251
        u = None
1✔
252
    try:
1✔
253
        c = find_key_name(y, prefix=('comment', 'description'))
1✔
254
    except KeyError:
1✔
255
        c = None
1✔
256
    if u:
1✔
257
        units = y[u]
1✔
258
    else:
259
        units = y
1✔
260
    if c:
1✔
261
        comments = y[c]
1✔
262
    return (units, comments)
1✔
263

264

265
def annotate_table(table, units, inplace=False):
1✔
266
    """Add annotations to `table`.
267

268
    Parameters
269
    ----------
270
    table : :class:`astropy.table.Table` or :class:`astropy.table.QTable`
271
        A data table.
272
    units : :class:`dict`
273
        Mapping of table columns to units.
274
    inplace : :class:`bool`, optional
275
        If ``True``, modify `table` directly instead of returning a copy.
276

277
    Returns
278
    -------
279
    :class:`astropy.table.Table`
280
        An updated version of `table`.
281

282
    Notes
283
    -----
284
    Currently :class:`~astropy.table.Table` does not support the concept
285
    of column-specific comments, especially in a way where the comments
286
    could be written to a file. If this ever changes, this function could
287
    be extended to add comments.
288
    """
289
    log = get_logger()
1✔
290
    if inplace:
1✔
291
        t = table
1✔
292
    else:
293
        if isinstance(table, QTable):
1✔
294
            t = QTable(table)  # copy=True is the default.
1✔
295
        else:
296
            t = Table(table)
1✔
297
    for colname in t.colnames:
1✔
298
        if colname not in units:
1✔
299
            log.info("Column '%s' not found in units argument.", colname)
1✔
300
    for column in units:
1✔
301
        if column in t.colnames:
1✔
302
            if len(units[column]) > 0:
1✔
303
                try:
1✔
304
                    log.debug("t['%s'].unit = '%s'", column, units[column])
1✔
305
                    t[column].unit = units[column]
1✔
306
                except AttributeError:
1✔
307
                    #
308
                    # Can't change .unit if it is already set. Try to convert.
309
                    #
310
                    try:
1✔
311
                        log.debug("t.replace_column('%s', t['%s'].to('%s'))", column, column, units[column])
1✔
312
                        t.replace_column(column, t[column].to(units[column]))
1✔
313
                    except UnitConversionError:
1✔
314
                        log.error("Cannot add or replace unit '%s' to column '%s'!", units[column], column)
1✔
315
            else:
316
                log.debug("Not setting blank unit for column '%s'.", column)
1✔
317
        else:
318
            log.debug("Column '%s' not present in table.", column)
1✔
319
    return t
1✔
320

321

322
def annotate_fits(filename, extension, output, units=None, comments=None,
1✔
323
                  truncate=False, overwrite=False, verbose=False):
324
    """Add annotations to HDU `extension` in FITS file `filename`.
325

326
    HDU `extension` must be a :class:`astropy.io.fits.BinTableHDU`.
327

328
    If `units` or `comments` is an empty dictionary, it will be ignored.
329

330
    Parameters
331
    ----------
332
    filename : :class:`str`
333
        Name of FITS file.
334
    extension : :class:`str` or :class:`int`
335
        Name or number of extension in `filename`.
336
    output : :class:`str`
337
        Name of file to write to.
338
    units : :class:`dict`, optional
339
        Mapping of table columns to units.
340
    comments : :class:`dict`, optional
341
        Mapping of table columns to comments.
342
    truncate : :class:`bool`, optional
343
        Allow long comments to be truncated when written out. The default
344
        is to raise an error if a comment is too long.
345
    overwrite : :class:`bool`, optional
346
        Pass this keyword to :meth:`astropy.io.fits.HDUList.writeto`.
347
    verbose : :class:`bool`, optional
348
        Include debug-level logging
349

350
    Returns
351
    -------
352
    :class:`astropy.io.fits.HDUList`
353
        An updated version of the file, equivalent to the data in `output`.
354

355
    Raises
356
    ------
357
    IndexError
358
        If the HDU specified (numerically) by `extension` does not exist.
359
    KeyError
360
        If the HDU specified (as an ``EXTNAME``) by `extension` does not exist.
361
    TypeError
362
        If the HDU specified is not supported by this function.
363
    ValueError
364
        If neither `units` nor `comments` are specified.
365

366
    Notes
367
    -----
368
    * Due to the way :func:`astropy.io.fits.open` manages memory, changes
369
      have to be written out while `filename` is still open,
370
      hence the mandatory `output` argument.
371
    * A FITS HDU checksum will always be added to the output, even if it
372
      was not already present.
373
    """
374
    if verbose:
1✔
375
        log = get_logger(DEBUG)
×
376
    else:
377
        log = get_logger()
1✔
378

379
    try:
1✔
380
        ext = int(extension)
1✔
381
    except ValueError:
1✔
382
        ext = extension
1✔
383
    if not units and not comments:
1✔
384
        raise ValueError("No input units or comments specified!")
1✔
385
    if comments:
1✔
386
        n_long = check_comment_length(comments, error=(not truncate))
1✔
387
    with fits.open(filename, mode='readonly') as hdulist:
1✔
388
        new_hdulist = hdulist.copy()
1✔
389
        try:
1✔
390
            hdu = new_hdulist[ext]
1✔
391
        except (IndexError, KeyError):
1✔
392
            raise
1✔
393
        if isinstance(hdu, fits.BinTableHDU) and not isinstance(hdu, fits.CompImageHDU):
1✔
394
            #
395
            # fits.CompImageHDU is a subclass of fits.BinTableHDU.
396
            #
397
            for i in range(1, 1000):
1✔
398
                ttype = f"TTYPE{i:d}"
1✔
399
                if ttype not in hdu.header:
1✔
400
                    break
1✔
401
                colname = hdu.header[ttype]
1✔
402
                if comments and colname in comments and comments[colname].strip():
1✔
403
                    if hdu.header.comments[ttype].strip():
1✔
404
                        log.warning("Overriding comment on column '%s': '%s' -> '%s'.", colname, hdu.header.comments[ttype].strip(), comments[colname].strip())
1✔
405
                    hdu.header[ttype] = (colname, comments[colname].strip())
1✔
406
                    log.debug('Set %s comment to "%s"', colname, comments[colname].strip())
1✔
407
                if units and colname in units and units[colname].strip():
1✔
408
                    tunit = f"TUNIT{i:d}"
1✔
409
                    if tunit in hdu.header and hdu.header[tunit].strip():
1✔
410
                        log.warning("Overriding units for column '%s': '%s' -> '%s'.", colname, hdu.header[tunit].strip(), units[colname].strip())
1✔
411
                        hdu.header[tunit] = (units[colname].strip(), colname+' units')
1✔
412
                    else:
413
                        hdu.header.insert(f"TFORM{i:d}", (tunit, units[colname].strip(), colname+' units'), after=True)
1✔
414
                        log.debug('Set %s units to "%s"', colname, units[colname].strip())
1✔
415
        else:
416
            raise TypeError("Adding units to objects other than fits.BinTableHDU is not supported!")
1✔
417
        hdu.add_checksum()
1✔
418
        new_hdulist.writeto(output, output_verify='warn', overwrite=overwrite, checksum=False)
1✔
419
        log.info('Wrote %s', output)
1✔
420
    return new_hdulist
1✔
421

422

423
def _options():
1✔
424
    """Parse command-line options.
425
    """
426
    parser = ArgumentParser(description="Add units or comments to a FITS file.",
1✔
427
                            prog=os.path.basename(sys.argv[0]))
428
    parser.add_argument('-c', '--comments', action='store', dest='comments', metavar='COMMENTS',
1✔
429
                        help="COMMENTS should have the form COLUMN='comment':COLUMN='comment'.")
430
    parser.add_argument('-C', '--csv', action='store', dest='csv', metavar='CSV',
1✔
431
                        help="Read annotations from CSV file.")
432
    parser.add_argument('-D', '--disable-comments', action='store_true', dest='disable_comments',
1✔
433
                        help='Do not add comments, even if they are defined in one of the inputs.')
434
    parser.add_argument('-e', '--extension', dest='extension', action='store', metavar='EXT', default='1',
1✔
435
                        help="Update FITS extension EXT, which can be a number or an EXTNAME. If not specified, HDU 1 will be updated, which is standard for simple binary tables.")
436
    parser.add_argument('-o', '--overwrite', dest='overwrite', action='store_true',
1✔
437
                        help='Overwrite the input FITS file.')
438
    parser.add_argument('-T', '--truncate-comments', dest='truncate', action='store_true',
1✔
439
                        help='Allow any long comments to be truncated when written out.')
440
    parser.add_argument('-u', '--units', action='store', dest='units', metavar='UNITS',
1✔
441
                        help="UNITS should have the form COLUMN='unit':COLUMN='unit'.")
442
    parser.add_argument('-v', '--verbose', dest='verbose', action='store_true',
1✔
443
                        help='Print extra debugging information.')
444
    parser.add_argument('-V', '--version', action='version', version='%(prog)s ' + desiutilVersion)
1✔
445
    parser.add_argument('-Y', '--yaml', action='store', dest='yaml', metavar='YAML',
1✔
446
                        help="Read annotations from YAML file.")
447
    parser.add_argument('fits', metavar='INPUT_FITS', help='FITS file to modify.')
1✔
448
    parser.add_argument('output', metavar='OUTPUT_FITS', nargs='?',
1✔
449
                        help='Write to new FITS file. If --overwrite is specified, this value is ignored.')
450
    options = parser.parse_args()
1✔
451
    return options
1✔
452

453

454
def main():
455
    """Entry-point for command-line scripts.
456

457
    Returns
458
    -------
459
    :class:`int`
460
        An integer suitable for passing to :func:`sys.exit`.
461
    """
462
    options = _options()
463
    if options.verbose:
464
        log = get_logger(DEBUG)
465
    else:
466
        log = get_logger()
467
    if options.csv:
468
        units, comments = load_csv_units(options.csv)
469
    elif options.yaml:
470
        units, comments = load_yml_units(options.yaml)
471
    else:
472
        if options.units:
473
            units = dict(tuple(c.split('=')) for c in options.units.split(':'))
474
        else:
475
            log.warning("No units have been specified!")
476
            units = dict()
477
        if options.comments:
478
            comments = dict(tuple(c.split('=')) for c in options.comments.split(':'))
479
        else:
480
            log.debug("No comments have been specified.")
481
            comments = dict()
482
    if options.disable_comments:
483
        log.info("Comments are disabled by user request.")
484
        comments = dict()
485
    if units:
486
        log.debug("Input Units")
487
        for k, v in units.items():
488
            log.debug("'%s': '%s'", k, v)
489
    if comments:
490
        log.debug("Input Comments")
491
        for k, v in comments.items():
492
            log.debug("'%s': '%s'", k, v)
493
    if options.overwrite and options.output:
494
        output = options.output
495
    elif options.overwrite:
496
        output = options.fits
497
    elif options.output:
498
        output = options.output
499
    else:
500
        log.critical("--overwrite not specified and no output file specified!")
501
        return 1
502
    try:
503
        hdulist = annotate_fits(options.fits, options.extension, output,
504
                                units, comments, truncate=options.truncate,
505
                                overwrite=options.overwrite, verbose=options.verbose)
506
    except OSError as e:
507
        if 'overwrite' in e.args[0]:
508
            log.critical("Output file exists and --overwrite was not specified!")
509
        else:
510
            log.critical(e.args[0])
511
        return 1
512
    except (IndexError, KeyError, TypeError, ValueError) as e:
513
        log.critical(str(e))
514
        log.critical("Exiting without writing output.")
515
        return 1
516
    return 0
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc