• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

igmhub / picca / 10906689253

17 Sep 2024 03:56PM UTC coverage: 67.099% (+0.06%) from 67.036%
10906689253

Pull #1074

gihtub

web-flow
Merge 083a061d5 into 935e87ec3
Pull Request #1074: [bump minor] add IVAR to all forest outputs

1870 of 3251 branches covered (57.52%)

Branch coverage included in aggregate %.

62 of 93 new or added lines in 4 files covered. (66.67%)

195 existing lines in 1 file now uncovered.

6665 of 9469 relevant lines covered (70.39%)

2.11 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

48.94
/py/picca/delta_extraction/data_catalogues/desi_healpix_fast.py
1
"""This module defines the class DesiData to load DESI data
2
"""
3
import logging
3✔
4
import multiprocessing
3✔
5
import time
3✔
6
import itertools
3✔
7

8
import fitsio
3✔
9
import numpy as np
3✔
10

11
from picca.delta_extraction.astronomical_objects.desi_forest import DesiForest
3✔
12
from picca.delta_extraction.data_catalogues.desi_data import (  # pylint: disable=unused-import
3✔
13
    DesiData,
14
    DesiDataFileHandler,
15
    accepted_options,
16
    defaults,
17
    merge_new_forest,
18
    verify_exposures_shape,
19
)
20
from picca.delta_extraction.errors import DataError
3✔
21

22

23
class DesiHealpixFast(DesiData):
3✔
24
    """Reads the spectra from DESI using healpix mode and formats its data as a
25
    list of Forest instances.
26

27
    Methods
28
    -------
29
    (see DesiData in py/picca/delta_extraction/data_catalogues/desi_data.py)
30
    __init__
31
    __parse_config
32
    get_filename
33
    read_data
34
    read_file
35

36
    Attributes
37
    ----------
38
    (see DesiData in py/picca/delta_extraction/data_catalogues/desi_data.py)
39

40
    logger: logging.Logger
41
    Logger object
42

43
    num_processors: int
44
    Number of processors to be used for parallel reading
45
    """
46

47
    def __init__(self, config):
3✔
48
        """Initialize class instance
49

50
        Arguments
51
        ---------
52
        config: configparser.SectionProxy
53
        Parsed options to initialize class
54
        """
55
        self.logger = logging.getLogger(__name__)
3✔
56

57
        super().__init__(config)
3✔
58

59
        #TODO: remove exception when this is implemented
60
        if self.analysis_type == "PK 1D":
3✔
61
            raise NotImplementedError("fast healpix reading is not implemented for PK 1D analyses")
62

63
    def get_filename(self, survey, healpix, coadd_name):
3✔
64
        """Get the name of the file to read
65

66
        Arguments
67
        ---------
68
        survey: str
69
        Name of the survey (sv, sv1, sv2, sv3, main, special)
70

71
        healpix: int
72
        Healpix of observations
73

74
        Return
75
        ------
76
        filename: str
77
        The name of the file to read
78

79
        is_mock: bool
80
        False, as we are reading DESI data
81
        """
82
        filename = (
3✔
83
            f"{self.input_directory}/{survey}/dark/{healpix//100}/{healpix}/{coadd_name}-{survey}-"
84
            f"dark-{healpix}.fits")
85
        # TODO: not sure if we want the dark survey to be hard coded
86
        # in here, probably won't run on anything else, but still
87
        return filename
3✔
88

89
    def read_data(self, is_mock=False):
3✔
90
        """Read the data.
91

92
        Method used to read healpix-based survey data.
93

94
        Return
95
        ------
96
        is_mock: bool
97
        False for DESI data and True for mocks
98

99
        Raise
100
        -----
101
        DataError if no quasars were found
102
        """
103
        coadd_name = "spectra" if self.use_non_coadded_spectra else "coadd"
3✔
104

105
        grouped_catalogue = self.catalogue.group_by(["HEALPIX", "SURVEY"])
3✔
106

107
        arguments = [
3✔
108
            (self.get_filename(group["SURVEY"][0], group["HEALPIX"][0], coadd_name), group)
109
            for group in grouped_catalogue.groups
110
        ]
111

112
        self.logger.info(f"reading data from {len(arguments)} files")
3✔
113
        if self.num_processors > 1:
3!
114
            context = multiprocessing.get_context('fork')
3✔
115
            with context.Pool(processes=self.num_processors) as pool:
3✔
116
                imap_it = pool.imap(
3✔
117
                    DesiHealpixFileHandler(self.logger), arguments)
118
                t0 = time.time()
3✔
119
                self.logger.progress("Merging threads")
3✔
120

121

122
                self.forests = combine_results(imap_it)
3✔
123
                t1 = time.time()
3✔
124
                self.logger.progress(f"Time spent meerging threads: {t1-t0}")
3✔
125

126
        if len(self.forests) == 0:
3!
NEW
127
            raise DataError("No quasars found, stopping here")
×
128

129
        return is_mock
3✔
130

131

132
# Class to read in parallel
133
# Seems lightweight to copy all these 3 arguments
134
class DesiHealpixFileHandler():
3✔
135
    """File handler for class DesiHealpix
136

137
    Methods
138
    -------
139
    (see DesiDataFileHandler in py/picca/delta_extraction/data_catalogues/desi_data.py)
140
    read_file
141

142
    Attributes
143
    ----------
144
    (see DesiDataFileHandler in py/picca/delta_extraction/data_catalogues/desi_data.py)
145
    """
146
    def __init__(self, logger):
3✔
147
        """Initialize file handler
148

149
        Arguments
150
        ---------
151
        logger: logging.Logger
152
        Logger object from parent class. Trying to initialize it here
153
        without copying failed data_tests.py
154
        """
155
        # The next line gives failed tests
156
        # self.logger = logging.getLogger(__name__)
157
        self.logger = logger
3✔
158

159
    def __call__(self, args):
3✔
160
        """Call method read_file. Note imap can be called with
161
        only one argument, hence tuple as argument.
162

163
        Arguments
164
        ---------
165
        args: tuple
166
        Arguments to be passed to read_file. Should contain a string with the
167
        filename and a astropy.table.Table with the quasar catalogue
168
        """
NEW
169
        return self.read_file(*args)
×
170

171
    def read_file(self, filename, catalogue):
3✔
172
        """Read the spectra and formats its data as Forest instances.
173

174
        Arguments
175
        ---------
176
        filename: str
177
        Name of the file to read
178

179
        catalogue: astropy.table.Table
180
        The quasar catalogue fragment associated with this file
181

182
        Returns:
183
        ---------
184
        forests_by_targetid: dict
185
        Dictionary were forests are stored.
186

187
        num_data: int
188
        The number of instances loaded
189

190
        Raise
191
        -----
192
        DataError if the analysis type is PK 1D and resolution data is not present
193
        """
NEW
194
        try:
×
NEW
195
            hdul = fitsio.FITS(filename)
×
NEW
196
        except IOError:
×
NEW
197
            self.logger.warning(f"Error reading '{filename}'. Ignoring file")
×
NEW
198
            return {}, 0
×
199
        # Read targetid from fibermap to match to catalogue later
NEW
200
        fibermap = hdul['FIBERMAP'].read()
×
201

NEW
202
        colors = ["B", "R"]
×
NEW
203
        if "Z_FLUX" in hdul:
×
NEW
204
            colors.append("Z")
×
205

206
        # read wavelength
NEW
207
        wave = np.hstack([hdul[f"{color}_WAVELENGTH"].read() for color in colors])
×
NEW
208
        log_lambda = np.log10(wave)
×
209
        # read flux and ivar
NEW
210
        flux_colors = {
×
211
            color: hdul[f"{color}_FLUX"].read() for color in colors
212
        }
NEW
213
        ivar_colors = {
×
214
            color: (hdul[f"{color}_IVAR"].read() * (hdul[f"{color}_MASK"].read() == 0))
215
            for color in colors
216
        }
217

218
        # Loop over quasars in catalogue fragment
NEW
219
        forests = []
×
NEW
220
        for row in catalogue:
×
221
            # Find which row in tile contains this quasar
222
            # It should be there by construction
NEW
223
            targetid = row["TARGETID"]
×
NEW
224
            w_t = np.where(fibermap["TARGETID"] == targetid)[0]
×
NEW
225
            if len(w_t) == 0:
×
NEW
226
                self.logger.warning(
×
227
                    f"Error reading {targetid}. Ignoring object")
NEW
228
                continue
×
229

NEW
230
            w_t = w_t[0]
×
231
            # Construct DesiForest instance
232
            # Fluxes from the different spectrographs will be coadded
NEW
233
            flux = np.hstack([item[w_t] for item in flux_colors.values()])
×
NEW
234
            ivar = np.hstack([item[w_t] for item in ivar_colors.values()])
×
235

NEW
236
            args = {
×
237
                "flux": flux,
238
                "ivar": ivar,
239
                "targetid": targetid,
240
                "ra": row['RA'],
241
                "dec": row['DEC'],
242
                "z": row['Z'],
243
                "log_lambda": log_lambda,
244
            }
NEW
245
            forest = DesiForest(**args)
×
NEW
246
            forest.rebin()
×
NEW
247
            forests.append(forest)
×
248

NEW
249
        return forests
×
250

251

252
def combine_results(lists):
3✔
253
    """Combine the content of all the lists into a single one
254

255
    Arguments
256
    ---------
257
    lists: list of lists
258
    The content to be merged
259

260
    Return
261
    ------
262
    combined_list: list
263
    The combined list
264
    """
265
    # Combine a list of lists into a single list
266
    return list(itertools.chain(*lists))
3✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc