• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

igmhub / picca / 9971080695

17 Jul 2024 08:45AM UTC coverage: 66.931% (+0.2%) from 66.751%
9971080695

Pull #1071

gihtub

web-flow
Merge 982a0f12c into fb832aeda
Pull Request #1071: Fast desihealpix [bump minor]

1824 of 3127 branches covered (58.33%)

Branch coverage included in aggregate %.

53 of 83 new or added lines in 2 files covered. (63.86%)

209 existing lines in 1 file now uncovered.

6278 of 8978 relevant lines covered (69.93%)

2.1 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

48.39
/py/picca/delta_extraction/data_catalogues/desi_healpix_fast.py
1
"""This module defines the class DesiData to load DESI data
2
"""
3
import logging
3✔
4
import multiprocessing
3✔
5
import time
3✔
6
import itertools
3✔
7

8
import fitsio
3✔
9
import numpy as np
3✔
10

11
from picca.delta_extraction.astronomical_objects.desi_forest import DesiForest
3✔
12
from picca.delta_extraction.data_catalogues.desi_data import (  # pylint: disable=unused-import
3✔
13
    DesiData,
14
    DesiDataFileHandler,
15
    accepted_options,
16
    defaults,
17
    merge_new_forest,
18
    verify_exposures_shape,
19
)
20
from picca.delta_extraction.errors import DataError
3✔
21

22

23
class DesiHealpixFast(DesiData):
3✔
24
    """Reads the spectra from DESI using healpix mode and formats its data as a
25
    list of Forest instances.
26

27
    Methods
28
    -------
29
    (see DesiData in py/picca/delta_extraction/data_catalogues/desi_data.py)
30
    __init__
31
    __parse_config
32
    get_filename
33
    read_data
34
    read_file
35

36
    Attributes
37
    ----------
38
    (see DesiData in py/picca/delta_extraction/data_catalogues/desi_data.py)
39

40
    logger: logging.Logger
41
    Logger object
42

43
    num_processors: int
44
    Number of processors to be used for parallel reading
45
    """
46

47
    def __init__(self, config):
3✔
48
        """Initialize class instance
49

50
        Arguments
51
        ---------
52
        config: configparser.SectionProxy
53
        Parsed options to initialize class
54
        """
55
        self.logger = logging.getLogger(__name__)
3✔
56

57
        super().__init__(config)
3✔
58

59
    def get_filename(self, survey, healpix, coadd_name):
3✔
60
        """Get the name of the file to read
61

62
        Arguments
63
        ---------
64
        survey: str
65
        Name of the survey (sv, sv1, sv2, sv3, main, special)
66

67
        healpix: int
68
        Healpix of observations
69

70
        Return
71
        ------
72
        filename: str
73
        The name of the file to read
74

75
        is_mock: bool
76
        False, as we are reading DESI data
77
        """
78
        filename = (
3✔
79
            f"{self.input_directory}/{survey}/dark/{healpix//100}/{healpix}/{coadd_name}-{survey}-"
80
            f"dark-{healpix}.fits")
81
        # TODO: not sure if we want the dark survey to be hard coded
82
        # in here, probably won't run on anything else, but still
83
        return filename
3✔
84

85
    def read_data(self, is_mock=False):
3✔
86
        """Read the data.
87

88
        Method used to read healpix-based survey data.
89

90
        Return
91
        ------
92
        is_mock: bool
93
        False for DESI data and True for mocks
94

95
        Raise
96
        -----
97
        DataError if no quasars were found
98
        """
99
        coadd_name = "spectra" if self.use_non_coadded_spectra else "coadd"
3✔
100

101
        grouped_catalogue = self.catalogue.group_by(["HEALPIX", "SURVEY"])
3✔
102

103
        arguments = [
3✔
104
            (self.get_filename(group["SURVEY"][0], group["HEALPIX"][0], coadd_name), group)
105
            for group in grouped_catalogue.groups
106
        ]
107

108
        self.logger.info(f"reading data from {len(arguments)} files")
3✔
109
        if self.num_processors > 1:
3!
110
            context = multiprocessing.get_context('fork')
3✔
111
            with context.Pool(processes=self.num_processors) as pool:
3✔
112
                imap_it = pool.imap(
3✔
113
                    DesiHealpixFileHandler(self.logger), arguments)
114
                t0 = time.time()
3✔
115
                self.logger.progress("Merging threads")
3✔
116

117

118
                self.forests = combine_results(imap_it)
3✔
119
                t1 = time.time()
3✔
120
                self.logger.progress(f"Time spent meerging threads: {t1-t0}")
3✔
121

122
        if len(self.forests) == 0:
3!
NEW
123
            raise DataError("No quasars found, stopping here")
×
124

125
        return is_mock
3✔
126

127

128
# Class to read in parallel
129
# Seems lightweight to copy all these 3 arguments
130
class DesiHealpixFileHandler():
3✔
131
    """File handler for class DesiHealpix
132

133
    Methods
134
    -------
135
    (see DesiDataFileHandler in py/picca/delta_extraction/data_catalogues/desi_data.py)
136
    read_file
137

138
    Attributes
139
    ----------
140
    (see DesiDataFileHandler in py/picca/delta_extraction/data_catalogues/desi_data.py)
141
    """
142
    def __init__(self, logger):
3✔
143
        """Initialize file handler
144

145
        Arguments
146
        ---------
147
        logger: logging.Logger
148
        Logger object from parent class. Trying to initialize it here
149
        without copying failed data_tests.py
150
        """
151
        # The next line gives failed tests
152
        # self.logger = logging.getLogger(__name__)
153
        self.logger = logger
3✔
154

155
    def __call__(self, args):
3✔
156
        """Call method read_file. Note imap can be called with
157
        only one argument, hence tuple as argument.
158

159
        Arguments
160
        ---------
161
        args: tuple
162
        Arguments to be passed to read_file. Should contain a string with the
163
        filename and a astropy.table.Table with the quasar catalogue
164
        """
NEW
165
        return self.read_file(*args)
×
166

167
    def read_file(self, filename, catalogue):
3✔
168
        """Read the spectra and formats its data as Forest instances.
169

170
        Arguments
171
        ---------
172
        filename: str
173
        Name of the file to read
174

175
        catalogue: astropy.table.Table
176
        The quasar catalogue fragment associated with this file
177

178
        Returns:
179
        ---------
180
        forests_by_targetid: dict
181
        Dictionary were forests are stored.
182

183
        num_data: int
184
        The number of instances loaded
185

186
        Raise
187
        -----
188
        DataError if the analysis type is PK 1D and resolution data is not present
189
        """
NEW
190
        try:
×
NEW
191
            hdul = fitsio.FITS(filename)
×
NEW
192
        except IOError:
×
NEW
193
            self.logger.warning(f"Error reading '{filename}'. Ignoring file")
×
NEW
194
            return {}, 0
×
195
        # Read targetid from fibermap to match to catalogue later
NEW
196
        fibermap = hdul['FIBERMAP'].read()
×
197

NEW
198
        colors = ["B", "R"]
×
NEW
199
        if "Z_FLUX" in hdul:
×
NEW
200
            colors.append("Z")
×
201

202
        # read wavelength
NEW
203
        wave = np.hstack([hdul[f"{color}_WAVELENGTH"].read() for color in colors])
×
NEW
204
        log_lambda = np.log10(wave)
×
205
        # read flux and ivar
NEW
206
        flux_colors = {
×
207
            color: hdul[f"{color}_FLUX"].read() for color in colors
208
        }
NEW
209
        ivar_colors = {
×
210
            color: (hdul[f"{color}_IVAR"].read() * (hdul[f"{color}_MASK"].read() == 0))
211
            for color in colors
212
        }
213

214
        # Loop over quasars in catalogue fragment
NEW
215
        forests = []
×
NEW
216
        for row in catalogue:
×
217
            # Find which row in tile contains this quasar
218
            # It should be there by construction
NEW
219
            targetid = row["TARGETID"]
×
NEW
220
            w_t = np.where(fibermap["TARGETID"] == targetid)[0]
×
NEW
221
            if len(w_t) == 0:
×
NEW
222
                self.logger.warning(
×
223
                    f"Error reading {targetid}. Ignoring object")
NEW
224
                continue
×
225

NEW
226
            w_t = w_t[0]
×
227
            # Construct DesiForest instance
228
            # Fluxes from the different spectrographs will be coadded
NEW
229
            flux = np.hstack([item[w_t] for item in flux_colors.values()])
×
NEW
230
            ivar = np.hstack([item[w_t] for item in ivar_colors.values()])
×
231

NEW
232
            args = {
×
233
                "flux": flux,
234
                "ivar": ivar,
235
                "targetid": targetid,
236
                "ra": row['RA'],
237
                "dec": row['DEC'],
238
                "z": row['Z'],
239
                "log_lambda": log_lambda,
240
            }
NEW
241
            forest = DesiForest(**args)
×
NEW
242
            forest.rebin()
×
NEW
243
            forests.append(forest)
×
244

NEW
245
        return forests
×
246

247

248
def combine_results(lists):
3✔
249
    """Combine the content of all the lists into a single one
250

251
    Arguments
252
    ---------
253
    lists: list of lists
254
    The content to be merged
255

256
    Return
257
    ------
258
    combined_list: list
259
    The combined list
260
    """
261
    # Combine a list of lists into a single list
262
    return list(itertools.chain(*lists))
3✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc