• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

desihub / desitarget / 10218914087

02 Aug 2024 04:31PM UTC coverage: 53.05%. Remained the same
10218914087

Pull #827

github

web-flow
Merge branch 'main' into ADM-col-gd1
Pull Request #827: Add GAIA_ prefix to PHOT_*_MEAN_MAG column names

0 of 2 new or added lines in 2 files covered. (0.0%)

2 existing lines in 1 file now uncovered.

7818 of 14737 relevant lines covered (53.05%)

0.53 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/py/desitarget/streams/io.py
1
"""
2
desitarget.streams.io
3
=====================
4

5
Reading/writing data for the MWS Stellar Stream programs.
6
"""
7
from time import time
×
8

9
import os
×
10
import fitsio
×
11
import numpy as np
×
12
import healpy as hp
×
13
import astropy.coordinates as acoo
×
14
import astropy.units as auni
×
15

16
from desitarget import io
×
17
from desitarget.geomask import pixarea2nside, add_hp_neighbors, sweep_files_touch_hp
×
18
from desitarget.gaiamatch import match_gaia_to_primary_post_dr3
×
19
from desitarget.targets import resolve
×
20
from desitarget.streams.utilities import betw, ivars_to_errors
×
21
from desitarget.internal import sharedmem
×
22

23
from desiutil import depend
×
24

25
# ADM set up the DESI default logger.
26
from desiutil.log import get_logger
×
27
log = get_logger()
×
28

29
# ADM the Legacy Surveys part of the data model for working with streams.
30
streamcolsLS = np.array([], dtype=[
×
31
    ('RELEASE', '>i2'), ('BRICKID', '>i4'), ('TYPE', 'S4'),
32
    ('OBJID', '>i4'), ('RA', '>f8'), ('DEC', '>f8'), ('EBV', '>f4'),
33
    ('FLUX_G', '>f4'), ('FIBERTOTFLUX_G', '>f4'),
34
    ('FLUX_R', '>f4'), ('FIBERTOTFLUX_R', '>f4'),
35
    ('FLUX_Z', '>f4'), ('FIBERTOTFLUX_Z', '>f4'),
36
])
37

38
# ADM the Gaia part of the data model for working with streams.
39
streamcolsGaia = np.array([], dtype=[
×
40
    ('REF_EPOCH', '>f4'),  ('REF_ID', '>i8'),
41
    ('PARALLAX', '>f4'), ('PARALLAX_ERROR', '>f4'),
42
    ('PMRA', '>f4'), ('PMRA_ERROR', '>f4'),
43
    ('PMDEC', '>f4'), ('PMDEC_ERROR', '>f4'),
44
    ('ASTROMETRIC_PARAMS_SOLVED', '>i1'), ('NU_EFF_USED_IN_ASTROMETRY', '>f4'),
45
    ('PSEUDOCOLOUR', '>f4'), ('ECL_LAT', '>f8'), ('PHOT_G_MEAN_MAG', '>f4'),
46
    ('PHOT_BP_MEAN_MAG', '>f4'), ('PHOT_RP_MEAN_MAG', '>f4')
47
])
48

49
# ADM the Gaia Data Release for matching throughout this module.
50
gaiadr = "dr3"
×
51

52

53
def read_data_per_stream_one_file(filename, rapol, decpol, mind, maxd):
×
54
    """Assemble the data needed for a stream program from one file
55

56
    Parameters
57
    ----------
58
    swdir : :class:`str`
59
        Name of a Legacy Surveys sweep file.
60
    rapol, decpol : :class:`float`
61
        Pole in the stream coordinate system in DEGREES.
62
    mind, maxd : :class:`float` or `int`
63
        Minimum and maximum angular distance from the pole of the stream
64
        coordinate system to search for members in DEGREES.
65

66
    Returns
67
    -------
68
    :class:`array_like`
69
        An array of objects from the filename that are in the stream,
70
        with matched Gaia information.
71
    """
72
    objs = io.read_tractor(filename)
×
73

74
    # ADM codinates of the stream.
75
    cstream = acoo.SkyCoord(rapol*auni.degree, decpol*auni.degree)
×
76
    cobjs = acoo.SkyCoord(objs["RA"]*auni.degree, objs["DEC"]*auni.degree)
×
77

78
    # ADM separation between the objects of interest and the stream.
79
    sep = cobjs.separation(cstream)
×
80

81
    # ADM only retain objects in the stream...
82
    ii = betw(sep.value, mind, maxd)
×
83

84
    # ADM ...at a declination of > -20o...
85
    ii &= objs["DEC"] > -20.
×
86

87
    # ADM ...that aren't very faint (> 22.5 mag in r).
88
    ii &= objs["FLUX_R"] > 1
×
89
    # ADM Also guard against negative fluxes in g/r.
90
    # ii &= objs["FLUX_G"] > 0.
91
    # ii &= objs["FLUX_Z"] > 0.
92

93
    objs = objs[ii]
×
94

95
    # ADM limit to northern objects in northern imaging and southern
96
    # ADM objects in southern imaging.
97
    LSobjs = resolve(objs)
×
98

99
    # ADM set up an an output array, and only retain critical columns
100
    # ADM from the global data model.
101
    data = np.zeros(len(LSobjs), dtype=streamcolsLS.dtype.descr +
×
102
                    streamcolsGaia.dtype.descr)
103

104
    # ADM catch the case where there are no objects meeting the cuts.
105
    if len(LSobjs) > 0:
×
106
        gaiaobjs = match_gaia_to_primary_post_dr3(LSobjs, matchrad=1., dr=gaiadr)
×
107

108
        # ADM to try and better resemble Gaia data, set zero
109
        # ADM magnitudes and proper motions to NaNs and change
110
        # ADM IVARs back to errors.
111
        gaiaobjs = ivars_to_errors(
×
112
            gaiaobjs, colnames=["PARALLAX_IVAR", "PMRA_IVAR", "PMDEC_IVAR"])
113

114
        for col in ["PHOT_G_MEAN_MAG", "PHOT_BP_MEAN_MAG", "PHOT_RP_MEAN_MAG",
×
115
                    "PARALLAX", "PMRA", "PMDEC"]:
116
            ii = gaiaobjs[col] < 1e-16
×
117
            ii &= gaiaobjs[col] > -1e-16
×
118
            gaiaobjs[col][ii] = np.nan
×
119

120
        # ADM a (probably unnecessary) sanity check.
121
        assert(len(gaiaobjs) == len(LSobjs))
×
122

123
        # ADM add data for the Legacy Surveys columns.
124
        for col in streamcolsLS.dtype.names:
×
125
            data[col] = LSobjs[col]
×
126
        # ADM add data for the Gaia columns.
127
        for col in streamcolsGaia.dtype.names:
×
128
            data[col] = gaiaobjs[col]
×
129

130
    return data
×
131

132

133
def read_data_per_stream(swdir, rapol, decpol, mind, maxd, stream_name,
×
134
                         readcache=True, addnors=True, test=False, numproc=1):
135
    """Assemble the data needed for a particular stream program.
136

137
    Parameters
138
    ----------
139
    swdir : :class:`str`
140
        Root directory of Legacy Surveys sweep files for a given data
141
        release for ONE of EITHER north or south, e.g.
142
        "/global/cfs/cdirs/cosmo/data/legacysurvey/dr9/south/sweep/9.0".
143
    rapol, decpol : :class:`float`
144
        Pole in the stream coordinate system in DEGREES.
145
    mind, maxd : :class:`float` or `int`
146
        Minimum and maximum angular distance from the pole of the stream
147
        coordinate system to search for members in DEGREES.
148
    stream_name : :class:`str`
149
        Name of a stream. Used to make the cached filename, e.g. "GD1".
150
    readcache : :class:`bool`
151
        If ``True`` read from a previously constructed and cached file
152
        automatically, IF such a file exists. If ``False`` don't read
153
        from the cache AND OVERWRITE the cached file, if it exists. The
154
        cached file is $TARG_DIR/streamcache/streamname-drX-cache.fits,
155
        where streamname is the lower-case passed `stream_name` and drX
156
        is the Legacy Surveys Data Release (parsed from `swdir`).
157
    addnors : :class:`bool`
158
        If ``True`` then if `swdir` contains "north" add sweep files from
159
        the south by substituting "south" in place of "north" (and vice
160
        versa, i.e. if `swdir` contains "south" add sweep files from the
161
        north by substituting "north" in place of "south").
162
    test : :class:`bool`
163
        Read a subset of the data for testing purposes.
164
    numproc : :class:`int`, optional, defaults to 1 for serial
165
        The number of parallel processes to use. `numproc` of 16 is a
166
        good balance between speed and file I/O.
167

168
    Returns
169
    -------
170
    :class:`array_like` or `boolean`
171
        ``True`` for stream members.
172

173
    Notes
174
    -----
175
    - Example values for, e.g., GD1:
176
        swdir = "/global/cfs/cdirs/cosmo/data/legacysurvey/dr9/south/sweep/9.0"
177
        rapol, decpol = 34.5987, 29.7331
178
        mind, maxd = 80, 100
179
    - The $TARG_DIR environment variable must be set to read/write from
180
      a cache. If $TARG_DIR is not set, caching is completely ignored.
181
    - This is useful for a single stream. The :func:`~read_data` function
182
      is likely a better choice for looping over the entire LS sweeps
183
      data when targeting multiple streams.
184
    """
185
    # ADM start the clock.
186
    start = time()
×
187

188
    # ADM check whether $TARG_DIR exists. If it does, agree to read from
189
    # ADM and write to the cache.
190
    writecache = True
×
191
    targdir = os.environ.get("TARG_DIR")
×
192
    if targdir is None:
×
193
        msg = "Set $TARG_DIR environment variable to use the cache!"
×
194
        log.info(msg)
×
195
        readcache = False
×
196
        writecache = False
×
197
    else:
198
        # ADM retrieve the data release from the passed sweep directory.
199
        dr = [i for i in swdir.split(os.sep) if "dr" in i]
×
200
        # ADM fail if this doesn't look like a standard sweep directory.
201
        if len(dr) != 1:
×
202
            msg = 'swdir not parsed: should include a construction like '
×
203
            msg += '"dr9" or "dr10"'
×
204
            raise ValueError(msg)
×
205
        cachefile = os.path.join(os.getenv("TARG_DIR"), "streamcache",
×
206
                                 f"{stream_name.lower()}-{dr[0]}-cache.fits")
207

208
    # ADM if we have a cache, read it if requested and return the data.
209
    if readcache:
×
210
        if os.path.isfile(cachefile):
×
211
            objs = fitsio.read(cachefile, ext="STREAMCACHE")
×
212
            msg = f"Read {len(objs)} objects from {cachefile} cache file"
×
213
            log.info(msg)
×
214
            return objs
×
215
        else:
216
            msg = f"{cachefile} cache file doesn't exist. "
×
217
            msg += f"Proceeding as if readcache=False"
×
218
            log.info(msg)
×
219

220
    # ADM read in the sweep files.
221
    infiles = io.list_sweepfiles(swdir)
×
222

223
    # ADM read both the north and south directories, if requested.
224
    if addnors:
×
225
        if "south" in swdir:
×
226
            swdir2 = swdir.replace("south", "north")
×
227
        elif "north" in swdir:
×
228
            swdir2 = swdir.replace("north", "south")
×
229
        else:
230
            msg = "addnors passed but swdir does not contain north or south!"
×
231
            raise ValueError(msg)
×
232
        infiles += io.list_sweepfiles(swdir2)
×
233

234
    # ADM calculate nside for HEALPixel of approximately 1o to limit
235
    # ADM number of sweeps files that need to be read.
236
    nside = pixarea2nside(1)
×
237

238
    # ADM determine RA, Dec of all HEALPixels at this nside.
239
    allpix = np.arange(hp.nside2npix(nside))
×
240
    theta, phi = hp.pix2ang(nside, allpix, nest=True)
×
241
    ra, dec = np.degrees(phi), 90-np.degrees(theta)
×
242

243
    # ADM only retain HEALPixels in the stream, based on mind and maxd.
244
    cpix = acoo.SkyCoord(ra*auni.degree, dec*auni.degree)
×
245
    cstream = acoo.SkyCoord(rapol*auni.degree, decpol*auni.degree)
×
246
    sep = cpix.separation(cstream)
×
247
    ii = betw(sep.value, mind, maxd)
×
248
    pixlist = allpix[ii]
×
249

250
    # ADM pad with neighboring pixels to ensure stream is fully covered.
251
    newpixlist = add_hp_neighbors(nside, pixlist)
×
252

253
    # ADM determine which sweep files touch the relevant HEALPixels.
254
    filesperpixel, _, _ = sweep_files_touch_hp(nside, pixlist, infiles)
×
255
    infiles = list(np.unique(np.hstack([filesperpixel[pix] for pix in pixlist])))
×
256

257
    # ADM read a subset of the data for testing purposes, if requested.
258
    if test:
×
259
        msg = "Limiting data to first 20 files for testing purposes"
×
260
        log.info(msg)
×
261
        infiles = infiles[:20]
×
262

263
    def _read_data_per_stream_one_file(filename):
×
264
        """Determine the stream objects for a single sweep file"""
265
        return read_data_per_stream_one_file(filename, rapol, decpol, mind, maxd)
×
266

267
    nbrick = np.zeros((), dtype='i8')
×
268
    t0 = time()
×
269

270
    def _update_status(result):
×
271
        """wrapper for critical reduction operation on main parallel process"""
272
        if nbrick % 5 == 0 and nbrick > 0:
×
273
            elapsed = time() - t0
×
274
            rate = elapsed / nbrick
×
275
            log.info('{}/{} files; {:.1f} secs/file; {:.1f} total mins elapsed'
×
276
                     .format(nbrick, len(infiles), rate, elapsed/60.))
277

278
        nbrick[...] += 1
×
279
        return result
×
280

281
    # ADM parallel process sweep files, limit to objects in the stream.
282
    if numproc > 1:
×
283
        pool = sharedmem.MapReduce(np=numproc)
×
284
        with pool:
×
285
            allobjs = pool.map(_read_data_per_stream_one_file, infiles,
×
286
                               reduce=_update_status)
287
    else:
288
        allobjs = list()
×
289
        for fn in infiles:
×
290
            allobjs.append(_update_status(_read_data_per_stream_one_file(fn)))
×
291

292
    # ADM assemble all of the relevant objects.
293
    allobjs = np.concatenate(allobjs)
×
294
    log.info(f"Found {len(allobjs)} total objects...t={time()-start:.1f}s")
×
295

296
    # ADM if cache was passed and $TARG_DIR was set then write the data.
297
    if writecache:
×
298
        # ADM if the file doesn't exist we may need to make the directory.
299
        log.info(f"Writing cache to {cachefile}...t={time()-start:.1f}s")
×
300
        os.makedirs(os.path.dirname(cachefile), exist_ok=True)
×
301
        # ADM at least add the Gaia DR used to the header.
302
        hdr = fitsio.FITSHDR()
×
303
        hdr.add_record(dict(name="GAIADR", value=gaiadr,
×
304
                            comment="GAIA Data Release matched to"))
305
        io.write_with_units(cachefile, allobjs,
×
306
                            header=hdr, extname="STREAMCACHE")
307

308
    return allobjs
×
309

310

311
def write_targets(dirname, targs, header, streamnames="", obscon=None,
×
312
                  subpriority=True):
313
    """Write stream targets to a FITS file.
314

315
    Parameters
316
    ----------
317
    dirname : :class:`str`
318
        The output directory name. Filenames are constructed from other
319
        inputs.
320
    targs : :class:`~numpy.ndarray`
321
        The numpy structured array of data to write.
322
    header : :class:`dict`
323
        Header for output file. Can be a FITShdr object or dictionary.
324
        Pass {} if you have no additional header information.
325
    streamnames : :class:`str, optional
326
        Information about stream names that correspond to the targets.
327
        Included in the output filename.
328
    obscon : :class:`str`, optional, defaults to `None`
329
        Can pass one of "DARK" or "BRIGHT". If passed, don't write the
330
        full set of data, rather only write targets appropriate for
331
        "DARK" or "BRIGHT" observing conditions. The relevant
332
        `PRIORITY_INIT` and `NUMOBS_INIT` columns will be derived from
333
        `PRIORITY_INIT_DARK`, etc. and `filename` will have "bright" or
334
        "dark" appended to the lowest DIRECTORY in the input `filename`.
335
   subpriority : :class:`bool`, optional, defaults to ``True``
336
        If ``True`` and a `SUBPRIORITY` column is in the input `targs`,
337
        then `SUBPRIORITY==0.0` entries are overwritten by a random float
338
        in the range 0 to 1, using a seed of 816.
339

340
    Returns
341
    -------
342
    :class:`int`
343
        The number of targets that were written to file.
344
    :class:`str`
345
        The name of the file to which targets were written.
346

347
    Notes
348
    -----
349
    - Must contain at least the columns:
350
        PHOT_G_MEAN_MAG, PHOT_BP_MEAN_MAG, PHOT_RP_MEAN_MAG and
351
        FIBERTOTFLUX_G, FIBERTOTFLUX_R, FIBERTOTFLUX_Z, RELEASE
352
    - Always OVERWRITES existing files!
353
    - Writes atomically. Any output files that died mid-write will be
354
      appended by ".tmp".
355
    - Units are automatically added from the desitarget units yaml file
356
      (see `/data/units.yaml`).
357
    - Mostly wraps :func:`~desitarget.io.write_with_units`.
358
    """
359
    # ADM limit to just BRIGHT or DARK targets, if requested.
360
    # ADM Ignore the filename output, we'll build that on-the-fly.
361
    if obscon is not None:
×
362
        _, header, targs = io._bright_or_dark(dirname, header, targs, obscon)
×
363

364
    # ADM construct the output filename.
365
    drs = list(set(targs["RELEASE"]//1000))
×
366
    if len(drs) == 1:
×
367
        drint = drs[0]
×
368
        drstr = f"dr{drint}"
×
369
    else:
370
        log.info("Couldn't parse LS data release. Defaulting to drX.")
×
371
        drint = "X"
×
372
        drstr = "drX"
×
373
    outfn = f"streamtargets-{streamnames.lower()}-bright.fits"
×
374
    outfn = os.path.join(dirname, drstr, io.desitarget_version,
×
375
                         "streamtargets", "main", "resolve", "bright", outfn)
376

377
    # ADM check if any targets are too bright.
378
    maglim = 15
×
379
    fluxlim = 10**((22.5-maglim)/2.5)
×
380
    toobright = np.zeros(len(targs), dtype="?")
×
NEW
381
    for col in ["GAIA_PHOT_G_MEAN_MAG", "GAIA_PHOT_BP_MEAN_MAG",
×
382
                "GAIA_PHOT_RP_MEAN_MAG"]:
383
        toobright |= (targs[col] != 0) & (targs[col] < maglim)
×
384
    for col in ["FIBERTOTFLUX_G", "FIBERTOTFLUX_R", "FIBERTOTFLUX_Z"]:
×
385
        toobright |= (targs[col] != 0) & (targs[col] > fluxlim)
×
386
    if np.any(toobright):
×
387
        tids = targs["TARGETID"][toobright]
×
388
        log.warning(f"Targets TOO BRIGHT to be written to {outfn}: {tids}")
×
389
        # ADM remove the targets that are too bright.
390
        targs = targs[~toobright]
×
391

392
    # ADM populate SUBPRIORITY with a reproducible random float.
393
    if "SUBPRIORITY" in targs.dtype.names and subpriority:
×
394
        subpseed = 816
×
395
        np.random.seed(subpseed)
×
396
        # SB only set subpriorities that aren't already set, but keep
397
        # original full random sequence order.
398
        ii = targs["SUBPRIORITY"] == 0.0
×
399
        targs["SUBPRIORITY"][ii] = np.random.random(len(targs))[ii]
×
400
        header["SUBPSEED"] = subpseed
×
401

402
    # ADM add the DESI dependencies.
403
    depend.add_dependencies(header)
×
404
    # ADM some other useful header information.
405
    depend.setdep(header, 'desitarget', io.desitarget_version)
×
406
    depend.setdep(header, 'desitarget-git', io.gitversion())
×
407
    depend.setdep(header, 'photcat', drstr)
×
408

409
    # ADM add information to construct the filename to the header.
410
    header["OBSCON"] = "bright"
×
411
    header["SURVEY"] = "main"
×
412
    header["RESOLVE"] = True
×
413
    header["DR"] = drint
×
414
    header["GAIADR"] = gaiadr
×
415

416
    # ADM create necessary directories, if they don't exist.
417
    os.makedirs(os.path.dirname(outfn), exist_ok=True)
×
418
    # ADM and, finally, write out the targets.
419
    io.write_with_units(outfn, targs, extname="STREAMTARGETS", header=header)
×
420

421
    return len(targs), outfn
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc