• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

JannisNe / timewise / 5790110325

pending completion
5790110325

push

github-actions

JannisNe
add doc about the CLI

1624 of 2074 relevant lines covered (78.3%)

0.78 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.51
/timewise/wise_data_base.py
1
import abc
1✔
2
import backoff
1✔
3
import copy
1✔
4
import json
1✔
5
import logging
1✔
6
import multiprocessing as mp
1✔
7
import os
1✔
8
import queue
1✔
9
import requests
1✔
10
import subprocess
1✔
11
import threading
1✔
12
import time
1✔
13
import tqdm
1✔
14

15
import astropy.units as u
1✔
16
import matplotlib.pyplot as plt
1✔
17
import numpy as np
1✔
18
import pandas as pd
1✔
19
import pyvo as vo
1✔
20
from astropy import constants
1✔
21
from astropy.cosmology import Planck18
1✔
22
from astropy.io import ascii
1✔
23
from astropy.table import Table
1✔
24
from astropy.coordinates import SkyCoord
1✔
25

26
from timewise.general import cache_dir, plots_dir, output_dir, logger_format, backoff_hndlr
1✔
27
from timewise.utils import StableTAPService
1✔
28

29
logger = logging.getLogger(__name__)
1✔
30

31

32
class WISEDataBase(abc.ABC):
1✔
33
    """
34
    Base class for WISE Data
35

36
    :param base_name: unique name to determine storage directories
37
    :type base_name: str
38
    :param parent_sample_class: class for parent sample
39
    :type parent_sample_class: `ParentSample` class
40
    :param min_sep_arcsec: minimum separation required to the parent sample sources
41
    :type min_sep_arcsec: float
42
    :param n_chunks: number of chunks in declination
43
    :type n_chunks: int
44
    """
45

46
    service_url = 'https://irsa.ipac.caltech.edu/TAP'
1✔
47
    service = StableTAPService(service_url)
1✔
48
    active_tap_phases = {"QUEUED", "EXECUTING", "RUN", "COMPLETED", "ERROR", "UNKNOWN"}
1✔
49
    running_tap_phases = ["QUEUED", "EXECUTING", "RUN"]
1✔
50
    done_tap_phases = {"COMPLETED", "ABORTED", "ERROR"}
1✔
51

52
    query_types = ['positional', 'by_allwise_id']
1✔
53

54

55
    table_names = pd.DataFrame([
1✔
56
        ('AllWISE Multiepoch Photometry Table', 'allwise_p3as_mep'),
57
        ('AllWISE Source Catalog', 'allwise_p3as_psd'),
58
        ('WISE 3-Band Cryo Single Exposure (L1b) Source Table', 'allsky_3band_p1bs_psd'),
59
        ('NEOWISE-R Single Exposure (L1b) Source Table', 'neowiser_p1bs_psd'),
60

61
    ], columns=['nice_table_name', 'table_name'])
62

63
    bands = ['W1', 'W2']
1✔
64
    flux_key_ext = "_flux"
1✔
65
    flux_density_key_ext = "_flux_density"
1✔
66
    mag_key_ext = "_mag"
1✔
67
    luminosity_key_ext = "_luminosity"
1✔
68
    error_key_ext = "_error"
1✔
69
    band_plot_colors = {'W1': 'r', 'W2': 'b'}
1✔
70

71
    photometry_table_keymap = {
1✔
72
        'AllWISE Multiepoch Photometry Table': {
73
            'flux': {
74
                'w1flux_ep':    f'W1{flux_key_ext}',
75
                'w1sigflux_ep': f'W1{flux_key_ext}{error_key_ext}',
76
                'w2flux_ep':    f'W2{flux_key_ext}',
77
                'w2sigflux_ep': f'W2{flux_key_ext}{error_key_ext}'
78
            },
79
            'mag': {
80
                'w1mpro_ep':    f'W1{mag_key_ext}',
81
                'w1sigmpro_ep': f'W1{mag_key_ext}{error_key_ext}',
82
                'w2mpro_ep':    f'W2{mag_key_ext}',
83
                'w2sigmpro_ep': f'W2{mag_key_ext}{error_key_ext}'
84
            }
85
        },
86
        'NEOWISE-R Single Exposure (L1b) Source Table': {
87
            'flux': {
88
                'w1flux':       f'W1{flux_key_ext}',
89
                'w1sigflux':    f'W1{flux_key_ext}{error_key_ext}',
90
                'w2flux':       f'W2{flux_key_ext}',
91
                'w2sigflux':    f'W2{flux_key_ext}{error_key_ext}'
92
            },
93
            'mag': {
94
                'w1mpro':       f'W1{mag_key_ext}',
95
                'w1sigmpro':    f'W1{mag_key_ext}{error_key_ext}',
96
                'w2mpro':       f'W2{mag_key_ext}',
97
                'w2sigmpro':    f'W2{mag_key_ext}{error_key_ext}'
98
            }
99
        }
100
    }
101

102
    # zero points come from https://wise2.ipac.caltech.edu/docs/release/allsky/expsup/sec4_4h.html#conv2flux
103
    # published in Jarret et al. (2011): https://ui.adsabs.harvard.edu/abs/2011ApJ...735..112J/abstract
104
    magnitude_zeropoints = {
1✔
105
        'F_nu': {
106
            'W1': 309.54 * u.Jy,
107
            'W2': 171.787 * u.Jy
108
        },
109
        'Fstar_nu': {
110
            'W1': 306.682 * u.Jy,
111
            'W2': 170.663 * u.Jy
112
        },
113
        'Mag': {
114
            'W1': 20.752,
115
            'W2': 19.596
116
        }
117
    }
118

119
    aperture_corrections = {
1✔
120
        'W1': 0.222,
121
        'W2': 0.280
122
    }
123

124
    _this_dir = os.path.abspath(os.path.dirname(__file__))
1✔
125
    magnitude_zeropoints_corrections = ascii.read(f'{_this_dir}/wise_flux_conversion_correction.dat',
1✔
126
                                                  delimiter='\t').to_pandas()
127

128
    band_wavelengths = {
1✔
129
        'W1': 3.368 * 1e-6 * u.m,
130
        'W2': 4.618 * 1e-6 * u.m
131
    }
132

133
    constraints = [
1✔
134
        "nb < 2",
135
        "na < 1",
136
        "cc_flags like '00%'",
137
        "qi_fact >= 1",
138
        "saa_sep >= 5",
139
        "moon_masked like '00%'"
140
    ]
141

142
    parent_wise_source_id_key = 'AllWISE_id'
1✔
143
    parent_sample_wise_skysep_key = 'sep_to_WISE_source'
1✔
144

145
    def __init__(self,
1✔
146
                 base_name,
147
                 parent_sample_class,
148
                 min_sep_arcsec,
149
                 n_chunks):
150
        #######################################################################################
151
        # START SET-UP          #
152
        #########################
153

154
        self.parent_sample_class = parent_sample_class
1✔
155
        parent_sample = parent_sample_class() if parent_sample_class else None
1✔
156
        self.base_name = base_name
1✔
157
        self.min_sep = min_sep_arcsec * u.arcsec
1✔
158
        self._n_chunks = n_chunks
1✔
159

160
        # --------------------------- vvvv set up parent sample vvvv --------------------------- #
161
        self.parent_ra_key = parent_sample.default_keymap['ra'] if parent_sample else None
1✔
162
        self.parent_dec_key = parent_sample.default_keymap['dec'] if parent_sample else None
1✔
163
        self.parent_wise_source_id_key = WISEDataBase.parent_wise_source_id_key
1✔
164
        self.parent_sample_wise_skysep_key = WISEDataBase.parent_sample_wise_skysep_key
1✔
165
        self.parent_sample_default_entries = {
1✔
166
            self.parent_wise_source_id_key: "",
167
            self.parent_sample_wise_skysep_key: np.inf
168
        }
169

170
        self.parent_sample = parent_sample
1✔
171

172
        if self.parent_sample:
1✔
173
            for k, default in self.parent_sample_default_entries.items():
1✔
174
                if k not in parent_sample.df.columns:
1✔
175
                    self.parent_sample.df[k] = default
1✔
176

177
            self._no_allwise_source = self.parent_sample.df[self.parent_sample_wise_skysep_key] == np.inf
1✔
178

179
        else:
180
            self._no_allwise_source = None
×
181
        # --------------------------- ^^^^ set up parent sample ^^^^ --------------------------- #
182

183
        # set up directories
184
        self.cache_dir = os.path.join(cache_dir, base_name)
1✔
185
        self._cache_photometry_dir = os.path.join(self.cache_dir, "photometry")
1✔
186
        self.cluster_dir = os.path.join(self.cache_dir, 'cluster')
1✔
187
        self.cluster_log_dir = os.path.join(self.cluster_dir, 'logs')
1✔
188
        self.output_dir = os.path.join(output_dir, base_name)
1✔
189
        self.lightcurve_dir = os.path.join(self.output_dir, "lightcurves")
1✔
190
        self.plots_dir = os.path.join(plots_dir, base_name)
1✔
191

192
        for d in [self.cache_dir, self._cache_photometry_dir, self.cluster_dir, self.cluster_log_dir,
1✔
193
                  self.output_dir, self.lightcurve_dir, self.plots_dir]:
194
            if not os.path.isdir(d):
1✔
195
                logger.debug(f"making directory {d}")
1✔
196
                os.makedirs(d)
1✔
197

198
        file_handler = logging.FileHandler(filename=self.cache_dir + '/log.err', mode="a")
1✔
199
        file_handler.setLevel("WARNING")
1✔
200
        file_handler.setFormatter(logger_format)
1✔
201
        logger.addHandler(file_handler)
1✔
202

203
        self.submit_file = os.path.join(self.cluster_dir, 'submit.txt')
1✔
204

205
        # set up result attributes
206
        self._split_chunk_key = '__chunk'
1✔
207
        self._cached_raw_photometry_prefix = 'raw_photometry'
1✔
208
        self.tap_jobs = None
1✔
209
        self.queue = None
1✔
210
        self.clear_unbinned_photometry_when_binning = False
1✔
211
        self._cached_final_products = {
1✔
212
            'lightcurves': dict(),
213
            'metadata': dict()
214
        }
215

216
        self._tap_wise_id_key = 'wise_id'
1✔
217
        self._tap_orig_id_key = 'orig_id'
1✔
218

219
        # Any class that wants to implement cluster operation has to use this variable
220
        # It specifies which chunks will be processed by which jobs
221
        self.cluster_jobID_map = None
1✔
222

223
        #########################
224
        # END SET-UP            #
225
        #######################################################################################
226

227
        #######################################################################################
228
        # START CHUNK MASK      #
229
        #########################
230

231
        self.chunk_map = None
1✔
232
        self.n_chunks = self._n_chunks
1✔
233

234
    @property
1✔
235
    def n_chunks(self):
1✔
236
        return self._n_chunks
1✔
237

238
    @n_chunks.setter
1✔
239
    def n_chunks(self, value):
1✔
240
        """Sets the private variable _n_chunks and re-calculates the declination interval masks"""
241

242
        if value > 50:
1✔
243
            logger.warning(f"Very large number of chunks ({value})! "
×
244
                           f"Pay attention when getting photometry to not kill IRSA!")
245

246
        if self.parent_sample:
1✔
247

248
            self.chunk_map = np.zeros(len(self.parent_sample.df))
1✔
249
            N_in_chunk = int(round(len(self.chunk_map) / self._n_chunks))
1✔
250
            for i in range(self._n_chunks):
1✔
251
                start_ind = i * N_in_chunk
1✔
252
                end_ind = start_ind + N_in_chunk
1✔
253
                self.chunk_map[start_ind:end_ind] = int(i)
1✔
254

255
            self._n_chunks = int(max(self.chunk_map)) + 1
1✔
256

257
            if self._n_chunks != value:
1✔
258
                logger.info(f"All objectes included in {self._n_chunks:.0f} chunks.")
×
259

260
        else:
261
            logger.warning("No parent sample given! Can not calculate dec interval masks!")
×
262

263
    def _get_chunk_number(self, wise_id=None, parent_sample_index=None):
1✔
264
        if isinstance(wise_id, type(None)) and isinstance(parent_sample_index, type(None)):
1✔
265
            raise Exception
×
266

267
        if not isinstance(wise_id, type(None)):
1✔
268
            parent_sample_index = np.where(self.parent_sample.df[self.parent_wise_source_id_key] == int(wise_id))[0]
×
269
            logger.debug(f"wise ID {wise_id} at index {parent_sample_index}")
×
270

271
        loc = self.parent_sample.df.loc[int(parent_sample_index)].name
1✔
272
        iloc = self.parent_sample.df.index.get_loc(loc)
1✔
273
        _chunk_number = int(self.chunk_map[int(iloc)])
1✔
274
        logger.debug(f"chunk number is {_chunk_number} for {parent_sample_index}")
1✔
275
        return _chunk_number
1✔
276

277
        #########################
278
        # END CHUNK MASK        #
279
        #######################################################################################
280

281
    def _start_data_product(self, parent_sample_indices):
1✔
282

283
        # get all rows in this chunk and columns, specified in the keymap
284
        parent_sample_sel = self.parent_sample.df.loc[
1✔
285
            parent_sample_indices,
286
            list(self.parent_sample.default_keymap.values())
287
        ]
288

289
        # invert the keymap to rename the columns
290
        inverse_keymap = {v: k for k, v in self.parent_sample.default_keymap.items()}
1✔
291
        parent_sample_sel.rename(columns=inverse_keymap, inplace=True)
1✔
292
        parent_sample_sel.set_index(parent_sample_sel.index.astype(str), inplace=True)
1✔
293

294
        # save to data_product
295
        data_product = parent_sample_sel.to_dict(orient="index")
1✔
296

297
        return data_product
1✔
298

299
    @staticmethod
1✔
300
    def get_db_name(table_name, nice=False):
1✔
301
        """
302
        Get the right table name
303

304
        :param table_name: str, table name
305
        :param nice: bool, whether to get the nice table name
306
        :return: str
307
        """
308
        source_column = 'nice_table_name' if not nice else 'table_name'
1✔
309
        target_column = 'table_name' if not nice else 'nice_table_name'
1✔
310

311
        m = WISEDataBase.table_names[source_column] == table_name
1✔
312
        if np.any(m):
1✔
313
            table_name = WISEDataBase.table_names[target_column][m].iloc[0]
1✔
314
        else:
315
            logger.debug(f"{table_name} not in Table. Assuming it is the right name already.")
1✔
316
        return table_name
1✔
317

318
    ###########################################################################################################
319
    # START MATCH PARENT SAMPLE TO WISE SOURCES         #
320
    #####################################################
321

322
    def match_all_chunks(self,
1✔
323
                         table_name="AllWISE Source Catalog",
324
                         save_when_done=True,
325
                         additional_columns=None):
326
        """
327
        Match the parent sample to a WISE catalogue and add the result to the parent sample.
328

329
        :param table_name: The name of the table you want to match against
330
        :type table_name: str
331
        :param save_when_done: save the parent sample dataframe with the matching info when done
332
        :type save_when_done: bool
333
        :param additional_columns: optional, additional columns to add to the matching table
334
        :type additional_columns: list
335
        :return:
336
        """
337

338
        logger.info(f'matching all chunks to {table_name}')
1✔
339

340
        if additional_columns is None:
1✔
341
            additional_columns = []
1✔
342

343
        for i in range(self.n_chunks):
1✔
344
            self._match_single_chunk(i, table_name, additional_columns)
1✔
345

346
        _dupe_mask = self._get_dubplicated_wise_id_mask()
1✔
347

348
        self._no_allwise_source = self.parent_sample.df[self.parent_sample_wise_skysep_key] == np.inf
1✔
349
        if np.any(self._no_allwise_source):
1✔
350
            logger.warning(f"{len(self.parent_sample.df[self._no_allwise_source])} of {len(self.parent_sample.df)} "
×
351
                           f"entries without match!")
352

353
        if np.any(self._get_dubplicated_wise_id_mask()):
1✔
354
            logger.warning(self.parent_sample.df[self._get_dubplicated_wise_id_mask()])
×
355

356
        if save_when_done:
1✔
357
            self.parent_sample.save_local()
1✔
358

359
    def _run_gator_match(self, in_file, out_file, table_name,
1✔
360
                         one_to_one=True, minsep_arcsec=None, additional_keys='', silent=False, constraints=None):
361
        _one_to_one = '-F one_to_one=1 ' if one_to_one else ''
1✔
362
        _minsep_arcsec = self.min_sep.to("arcsec").value if minsep_arcsec is None else minsep_arcsec
1✔
363
        _db_name = self.get_db_name(table_name)
1✔
364
        _silent = "-s " if silent else ""
1✔
365
        _constraints = '-F constraints="' + " and ".join(constraints).replace('%', '%%') + '" ' if constraints else ""
1✔
366

367
        if _db_name == "allwise_p3as_mep":
1✔
368
            _sigpos = _source_id = _des = ""
1✔
369
            _id_key = "cntr_mf,cntr"
1✔
370
        else:
371
            _sigpos = 'sigra,sigdec,'
1✔
372
            _source_id = "source_id,"
1✔
373
            _des = 'designation,' if 'allwise' in _db_name else ''
1✔
374
            _id_key = 'cntr' if 'allwise' in _db_name else 'allwise_cntr,cntr'
1✔
375

376
        submit_cmd = f'curl ' \
1✔
377
                     f'--connect-timeout 3600 ' \
378
                     f'--max-time 3600 ' \
379
                     f'{_silent}' \
380
                     f'-o {out_file} ' \
381
                     f'-F filename=@{in_file} ' \
382
                     f'-F catalog={_db_name} ' \
383
                     f'-F spatial=Upload ' \
384
                     f'-F uradius={_minsep_arcsec} ' \
385
                     f'-F outfmt=1 ' \
386
                     f'{_one_to_one}' \
387
                     f'{_constraints}' \
388
                     f'-F selcols={_des}{_source_id}ra,dec,{_sigpos}{_id_key}{additional_keys} ' \
389
                     f'"https://irsa.ipac.caltech.edu/cgi-bin/Gator/nph-query"'
390

391
        logger.debug(f'submit command: {submit_cmd}')
1✔
392
        N_tries = 10
1✔
393
        while True:
1✔
394
            try:
1✔
395
                process = subprocess.Popen(submit_cmd, stdout=subprocess.PIPE, shell=True)
1✔
396
                break
1✔
397
            except OSError as e:
×
398
                if N_tries < 1:
×
399
                    raise OSError(e)
×
400
                logger.warning(f"{e}, retry")
×
401
                N_tries -= 1
×
402

403
        out_msg, err_msg = process.communicate()
1✔
404
        if out_msg:
1✔
405
            logger.info(out_msg.decode())
×
406
        if err_msg:
1✔
407
            logger.error(err_msg.decode())
×
408
        process.terminate()
1✔
409
        if os.path.isfile(out_file):
1✔
410
            return 1
1✔
411
        else:
412
            return 0
×
413

414
    def _match_to_wise(
1✔
415
            self,
416
            in_filename,
417
            out_filename,
418
            mask,
419
            table_name,
420
            N_retries=10,
421
            **gator_kwargs
422
    ):
423
        selected_parent_sample = copy.copy(
1✔
424
            self.parent_sample.df.loc[mask, [self.parent_ra_key, self.parent_dec_key]])
425
        selected_parent_sample.rename(columns={self.parent_dec_key: 'dec',
1✔
426
                                               self.parent_ra_key: 'ra'},
427
                                      inplace=True)
428
        logger.debug(f"{len(selected_parent_sample)} selected")
1✔
429

430
        # write to IPAC formatted table
431
        _selected_parent_sample_astrotab = Table.from_pandas(selected_parent_sample, index=True)
1✔
432
        logger.debug(f"writing {len(_selected_parent_sample_astrotab)} "
1✔
433
                     f"objects to {in_filename}")
434
        _selected_parent_sample_astrotab.write(in_filename, format='ipac', overwrite=True)
1✔
435
        _done = False
1✔
436

437
        while True:
1✔
438
            if N_retries == 0:
1✔
439
                raise RuntimeError('Failed with retries')
×
440

441
            try:
1✔
442
                # use Gator to query IRSA
443
                success = self._run_gator_match(in_filename, out_filename, table_name, **gator_kwargs)
1✔
444

445
                if not success:
1✔
446
                    # if not successful try again
447
                    logger.warning("no success, try again")
×
448
                    continue
×
449

450
                # load the result file
451
                gator_res = Table.read(out_filename, format='ipac')
1✔
452
                logger.debug(f"found {len(gator_res)} results")
1✔
453
                return gator_res
1✔
454

455
            except ValueError:
×
456
                # this will happen if the gator match returns an output containing the error message
457
                # read and display error message, then try again
458
                with open(out_filename, 'r') as f:
×
459
                    err_msg = f.read()
×
460
                logger.warning(f"{err_msg}: try again")
×
461

462
            finally:
463
                N_retries -= 1
1✔
464

465
    def _match_single_chunk(self, chunk_number, table_name, additional_columns=None):
1✔
466
        """
467
        Match the parent sample to WISE
468

469
        :param chunk_number: number of the declination chunk
470
        :type chunk_number: int
471
        :param table_name: optional, WISE table to match to, default is AllWISE Source Catalog
472
        :type table_name: str,
473
        :param additional_columns: optional, additional columns to be added to the parent sample
474
        :type additional_columns: list
475
        """
476

477
        dec_intervall_mask = self.chunk_map == chunk_number
1✔
478
        logger.debug(f"Any selected: {np.any(dec_intervall_mask)}")
1✔
479
        _parent_sample_declination_band_file = os.path.join(self.cache_dir, f"parent_sample_chunk{chunk_number}.xml")
1✔
480
        _output_file = os.path.join(self.cache_dir, f"parent_sample_chunk{chunk_number}.tbl")
1✔
481

482
        additional_keys = (
1✔
483
            "," + ",".join(additional_columns)
484
            if (additional_columns is not None) and (len(additional_columns) > 0)
485
            else ""
486
        )
487

488
        gator_res = self._match_to_wise(
1✔
489
            in_filename=_parent_sample_declination_band_file,
490
            out_filename=_output_file,
491
            mask=dec_intervall_mask,
492
            table_name=table_name,
493
            additional_keys=additional_keys,
494
        )
495

496
        for fn in [_parent_sample_declination_band_file, _output_file]:
1✔
497
            try:
1✔
498
                logger.debug(f"removing {fn}")
1✔
499
                os.remove(fn)
1✔
500
            except FileNotFoundError:
×
501
                logger.warning(f"No File!!")
×
502

503
        # insert the corresponding separation to the WISE source into the parent sample
504
        self.parent_sample.df.loc[
1✔
505
            dec_intervall_mask,
506
            self.parent_sample_wise_skysep_key
507
        ] = list(gator_res["dist_x"])
508

509
        # insert the corresponding WISE IDs into the parent sample
510
        self.parent_sample.df.loc[
1✔
511
            dec_intervall_mask,
512
            self.parent_wise_source_id_key
513
        ] = list(gator_res["cntr"])
514

515
        if len(additional_columns) > 0:
1✔
516
            for col in additional_columns:
1✔
517
                logger.debug(f"inserting {col}")
1✔
518

519
                if col not in self.parent_sample.df.columns:
1✔
520
                    self.parent_sample.df[col] = np.nan
1✔
521

522
                self.parent_sample.df.loc[
1✔
523
                    dec_intervall_mask,
524
                    col
525
                ] = list(gator_res[col])
526

527
        _no_match_mask = self.parent_sample.df[self.parent_sample_wise_skysep_key].isna() & dec_intervall_mask
1✔
528
        for k, default in self.parent_sample_default_entries.items():
1✔
529
            self.parent_sample.df.loc[_no_match_mask, k] = default
1✔
530

531
    def _get_dubplicated_wise_id_mask(self):
1✔
532
        idf_sorted_sep = self.parent_sample.df.sort_values(self.parent_sample_wise_skysep_key)
1✔
533
        idf_sorted_sep['duplicate'] = idf_sorted_sep[self.parent_wise_source_id_key].duplicated(keep='first')
1✔
534
        idf_sorted_sep.sort_index(inplace=True)
1✔
535
        _inf_mask = idf_sorted_sep[self.parent_sample_wise_skysep_key] < np.inf
1✔
536
        _dupe_mask = idf_sorted_sep['duplicate'] & (_inf_mask)
1✔
537
        if np.any(_dupe_mask):
1✔
538
            _N_dupe = len(self.parent_sample.df[_dupe_mask])
×
539
            logger.info(f"{_N_dupe} duplicated entries in parent sample")
×
540
        return _dupe_mask
1✔
541

542
    ###################################################
543
    # END MATCH PARENT SAMPLE TO WISE SOURCES         #
544
    ###########################################################################################################
545

546
    ###########################################################################################################
547
    # START GET PHOTOMETRY DATA       #
548
    ###################################
549

550
    def get_photometric_data(self, tables=None, perc=1, wait=0, service=None, nthreads=100,
1✔
551
                             chunks=None, overwrite=True, remove_chunks=False, query_type='positional',
552
                             skip_download=False, mask_by_position=False):
553
        """
554
        Load photometric data from the IRSA server for the matched sample. The result will be saved under
555

556
            </path/to/timewise/data/dir>/output/<base_name>/lightcurves/binned_lightcurves_<service>.json
557

558
        :param remove_chunks: remove single chunk files after binning
559
        :type remove_chunks: bools
560
        :param overwrite: overwrite already existing lightcurves and metadata
561
        :type overwrite: bool
562
        :param tables: WISE tables to use for photometry query, defaults to AllWISE and NOEWISER photometry
563
        :type tables: str or list-like
564
        :param perc: percentage of sources to load photometry for, default 1
565
        :type perc: float
566
        :param nthreads: max number of threads to launch
567
        :type nthreads: int
568
        :param service: either of 'gator' or 'tap', selects base on elements per chunk by default
569
        :type service: str
570
        :param wait: time in hours to wait after submitting TAP jobs
571
        :type wait: float
572
        :param chunks: containing indices of chunks to download
573
        :type chunks: list-like
574
        :param query_type: 'positional': query photometry based on distance from object, 'by_allwise_id': select all photometry points within a radius of 50 arcsec with the corresponding AllWISE ID
575
        :type query_type: str
576
        :param skip_download: if `True` skip downloading and only do binning
577
        :type skip_download: bool
578
        :param mask_by_position: if `True` mask single exposures that are too far away from the bulk
579
        :type mask_by_position: bool
580
        """
581

582
        mag = True
1✔
583
        flux = True
1✔
584

585
        if tables is None:
1✔
586
            tables = [
1✔
587
                'AllWISE Multiepoch Photometry Table',
588
                'NEOWISE-R Single Exposure (L1b) Source Table'
589
            ]
590

591
        if query_type not in self.query_types:
1✔
592
            raise ValueError(f"Unknown query type {query_type}! Choose one of {self.query_types}")
×
593

594
        if chunks is None:
1✔
595
            chunks = list(range(round(int(self.n_chunks * perc))))
1✔
596
        else:
597
            cm = [c not in self.chunk_map for c in chunks]
×
598
            if np.any(cm):
×
599
                raise ValueError(f"Chunks {np.array(chunks)[cm]} are not in chunk map. "
×
600
                                 f"Probably they are larger than the set chunk number of {self._n_chunks}")
601

602
        if service is None:
1✔
603
            elements_per_chunk = len(self.parent_sample.df) / self.n_chunks
×
604
            service = 'tap' if elements_per_chunk > 300 else 'gator'
×
605

606
        if (query_type == 'by_allwise_id') and (service == 'gator'):
1✔
607
            raise ValueError(f"Query type 'by_allwise_id' only implemented for service 'tap'!")
×
608

609
        if not skip_download:
1✔
610

611
            logger.debug(f"Getting {perc * 100:.2f}% of lightcurve chunks ({len(chunks)}) via {service} "
1✔
612
                         f"in {'magnitude' if mag else ''} {'flux' if flux else ''} "
613
                         f"from {tables}")
614

615
            if service == 'tap':
1✔
616
                self._query_for_photometry(tables, chunks, wait, mag, flux, nthreads, query_type)
1✔
617

618
            elif service == 'gator':
1✔
619
                self._query_for_photometry_gator(tables, chunks, mag, flux, nthreads)
1✔
620

621
        else:
622
            logger.info("skipping download, assume data is already downloaded.")
×
623

624
        self._select_individual_lightcurves_and_bin(service=service, chunks=chunks, mask_by_position=mask_by_position)
1✔
625
        for c in chunks:
1✔
626
            self.calculate_metadata(service=service, chunk_number=c, overwrite=True)
1✔
627

628
        self._combine_data_products(service=service, remove=remove_chunks, overwrite=overwrite)
1✔
629

630
    def _data_product_filename(self, service, chunk_number=None, jobID=None):
1✔
631

632
        n = "timewise_data_product_"
1✔
633

634
        if (chunk_number is None) and (jobID is None):
1✔
635
            return os.path.join(self.lightcurve_dir, f"{n}{service}.json")
1✔
636
        else:
637
            fn = f"{n}{service}{self._split_chunk_key}{chunk_number}"
1✔
638
            if (chunk_number is not None) and (jobID is None):
1✔
639
                return os.path.join(self._cache_photometry_dir, fn + ".json")
1✔
640
            else:
641
                return os.path.join(self._cache_photometry_dir, fn + f"_{jobID}.json")
1✔
642

643
    def load_data_product(self, service, chunk_number=None, jobID=None, return_filename=False):
1✔
644
        """
645
        Load data product from disk
646

647
        :param service: service used to download data ('tap' or 'gator')
648
        :type service: str
649
        :param chunk_number: chunk number to load, if None load combined file for this service
650
        :type chunk_number: int, optional
651
        :param jobID: jobID to load, if None load the combined file for this chunk
652
        :type jobID: int, optional
653
        :param return_filename: return filename of data product, defaults to False
654
        """
655
        fn = self._data_product_filename(service, chunk_number, jobID)
1✔
656
        logger.debug(f"loading {fn}")
1✔
657
        try:
1✔
658
            with open(fn, "r") as f:
1✔
659
                lcs = json.load(f)
1✔
660
            if return_filename:
1✔
661
                return lcs, fn
1✔
662
            return lcs
1✔
663
        except FileNotFoundError:
1✔
664
            logger.warning(f"No file {fn}")
1✔
665

666
    def _save_data_product(self, data_product, service, chunk_number=None, jobID=None, overwrite=False):
1✔
667
        fn = self._data_product_filename(service, chunk_number, jobID)
1✔
668
        logger.debug(f"saving {len(data_product)} new lightcurves to {fn}")
1✔
669

670
        if fn == self._data_product_filename(service):
1✔
671
            self._cached_final_products['lightcurves'][service] = data_product
1✔
672

673
        if not overwrite:
1✔
674
            try:
×
675
                old_data_product = self.load_data_product(service=service, chunk_number=chunk_number, jobID=jobID)
×
676
                logger.debug(f"Found {len(old_data_product)}. Combining")
×
677
                data_product = data_product.update(old_data_product)
×
678
            except FileNotFoundError as e:
×
679
                logger.info(f"FileNotFoundError: {e}. Making new binned lightcurves.")
×
680

681
        with open(fn, "w") as f:
1✔
682
            json.dump(data_product, f, indent=4)
1✔
683

684
    def _combine_data_products(
1✔
685
            self,
686
            service=None,
687
            chunk_number=None,
688
            remove=False,
689
            overwrite=False
690
    ):
691
        if not service:
1✔
692
            logger.info("Combining all lightcuves collected with all services")
×
693
            itr = ['service', ['gator', 'tap']]
×
694
            kwargs = {}
×
695
        elif chunk_number is None:
1✔
696
            logger.info(f"Combining all lightcurves collected with {service}")
1✔
697
            itr = ['chunk_number', range(self.n_chunks)]
1✔
698
            kwargs = {'service': service}
1✔
699
        elif chunk_number is not None:
1✔
700
            logger.info(f"Combining all lightcurves collected with {service} for chunk {chunk_number}")
1✔
701
            itr = ['jobID',
1✔
702
                   list(self.clusterJob_chunk_map.index[self.clusterJob_chunk_map.chunk_number == chunk_number])]
703
            kwargs = {'service': service, 'chunk_number': chunk_number}
1✔
704
        else:
705
            raise NotImplementedError
706

707
        lcs = None
1✔
708
        fns = list()
1✔
709
        missing_files = 0
1✔
710
        for i in itr[1]:
1✔
711
            kw = dict(kwargs)
1✔
712
            kw[itr[0]] = i
1✔
713
            kw['return_filename'] = True
1✔
714
            res = self.load_data_product(**kw)
1✔
715

716
            if not isinstance(res, type(None)):
1✔
717
                ilcs, ifn = res
1✔
718
                fns.append(ifn)
1✔
719
                if isinstance(lcs, type(None)):
1✔
720
                    lcs = dict(ilcs)
1✔
721
                else:
722
                    lcs.update(ilcs)
1✔
723

724
            else:
725
                missing_files += 1
×
726

727
            if missing_files > 0:
1✔
728
                msg = f"Missing {missing_files} for {service}"
×
729
                if chunk_number is not None:
×
730
                    msg += f" chunk {chunk_number}"
×
731
                msg += "! Not saving data product"
×
732
                logger.warning(msg)
×
733
                break
×
734

735
        if missing_files == 0:
1✔
736
            self._save_data_product(lcs, service=service, chunk_number=chunk_number, overwrite=overwrite)
1✔
737

738
            if remove:
1✔
739
                for fn in tqdm.tqdm(fns, desc="removing files"):
1✔
740
                    os.remove(fn)
1✔
741

742
            return True
1✔
743

744
        else:
745
            return False
×
746

747
    # ----------------------------------------------------------------------------------- #
748
    # START using GATOR to get photometry        #
749
    # ------------------------------------------ #
750

751
    def _gator_chunk_photometry_cache_filename(self, table_nice_name, chunk_number,
1✔
752
                                               additional_neowise_query=False, gator_input=False):
753
        table_name = self.get_db_name(table_nice_name)
1✔
754
        _additional_neowise_query = '_neowise_gator' if additional_neowise_query else ''
1✔
755
        _gator_input = '_gator_input' if gator_input else ''
1✔
756
        _ending = '.xml' if gator_input else'.tbl'
1✔
757
        fn = f"{self._cached_raw_photometry_prefix}_{table_name}{_additional_neowise_query}{_gator_input}" \
1✔
758
             f"{self._split_chunk_key}{chunk_number}{_ending}"
759
        return os.path.join(self._cache_photometry_dir, fn)
1✔
760

761
    def _thread_query_photometry_gator(self, chunk_number, table_name, mag, flux):
1✔
762
        _infile = self._gator_chunk_photometry_cache_filename(table_name, chunk_number, gator_input=True)
1✔
763
        _outfile = self._gator_chunk_photometry_cache_filename(table_name, chunk_number)
1✔
764
        _nice_name = self.get_db_name(table_name, nice=True)
1✔
765
        _additional_keys_list = ['mjd']
1✔
766
        if mag:
1✔
767
            _additional_keys_list += list(self.photometry_table_keymap[_nice_name]['mag'].keys())
1✔
768
        if flux:
1✔
769
            _additional_keys_list += list(self.photometry_table_keymap[_nice_name]['flux'].keys())
1✔
770

771
        _additional_keys = "," + ",".join(_additional_keys_list)
1✔
772
        _deci_mask = self.chunk_map == chunk_number
1✔
773
        _mask = _deci_mask #& (~self._no_allwise_source)
1✔
774

775
        res = self._match_to_wise(
1✔
776
            in_filename=_infile,
777
            out_filename=_outfile,
778
            mask=_mask,
779
            table_name=table_name,
780
            one_to_one=False,
781
            additional_keys=_additional_keys,
782
            minsep_arcsec=self.min_sep.to('arcsec').value,
783
            silent=True,
784
            constraints=self.constraints
785
        )
786

787
        os.remove(_infile)
1✔
788
        return res
1✔
789

790
    def _gator_photometry_worker_thread(self):
1✔
791
        while True:
1✔
792
            try:
1✔
793
                args = self.queue.get(block=False)
1✔
794
            except (AttributeError, queue.Empty):
1✔
795
                logger.debug('No more tasks, exiting')
1✔
796
                break
1✔
797
            logger.debug(f"{args}")
1✔
798
            self._thread_query_photometry_gator(*args)
1✔
799
            self.queue.task_done()
1✔
800
            logger.info(f"{self.queue.qsize()} tasks remaining")
1✔
801

802
    def _query_for_photometry_gator(self, tables, chunks, mag, flux, nthreads):
1✔
803
        nthreads = min(nthreads, len(chunks))
1✔
804
        logger.debug(f'starting {nthreads} workers')
1✔
805
        threads = [threading.Thread(target=self._gator_photometry_worker_thread) for _ in range(nthreads)]
1✔
806

807
        logger.debug(f"using {len(chunks)} chunks")
1✔
808
        self.queue = queue.Queue()
1✔
809
        for t in np.atleast_1d(tables):
1✔
810
            for i in chunks:
1✔
811
                self.queue.put([i, t, mag, flux])
1✔
812

813
        logger.info(f"added {self.queue.qsize()} tasks to queue")
1✔
814
        for t in threads:
1✔
815
            t.start()
1✔
816
        self.queue.join()
1✔
817
        self.queue = None
1✔
818

819
        for t in threads:
1✔
820
            t.join()
1✔
821

822
    def _get_unbinned_lightcurves_gator(self, chunk_number, clear=False):
1✔
823
        # load only the files for this chunk
824
        fns = [os.path.join(self._cache_photometry_dir, fn)
1✔
825
               for fn in os.listdir(self._cache_photometry_dir)
826
               if (fn.startswith(self._cached_raw_photometry_prefix) and
827
                   fn.endswith(f"{self._split_chunk_key}{chunk_number}.tbl"))
828
               ]
829

830
        logger.debug(f"chunk {chunk_number}: loading {len(fns)} files for chunk {chunk_number}")
1✔
831

832
        _data = list()
1✔
833
        for fn in fns:
1✔
834
            data_table = Table.read(fn, format='ipac').to_pandas()
1✔
835

836
            t = 'allwise_p3as_mep' if 'allwise' in fn else 'neowiser_p1bs_psd'
1✔
837
            nice_name = self.get_db_name(t, nice=True)
1✔
838
            cols = {'index_01': self._tap_orig_id_key}
1✔
839
            cols.update(self.photometry_table_keymap[nice_name]['mag'])
1✔
840
            cols.update(self.photometry_table_keymap[nice_name]['flux'])
1✔
841
            if 'allwise' in fn:
1✔
842
                cols['cntr_mf'] = 'allwise_cntr'
1✔
843

844
            data_table = data_table.rename(columns=cols)
1✔
845
            _data.append(data_table)
1✔
846

847
            if clear:
1✔
848
                os.remove(fn)
×
849

850
        lightcurves = pd.concat(_data)
1✔
851
        return lightcurves
1✔
852

853
    # ------------------------------------------ #
854
    # END using GATOR to get photometry          #
855
    # ----------------------------------------------------------------------------------- #
856

857
    # ----------------------------------------------------------------------------------- #
858
    # START using TAP to get photometry        #
859
    # ---------------------------------------- #
860

861
    def _get_photometry_query_string(self, table_name, mag, flux, query_type):
1✔
862
        """
863
        Construct a query string to submit to IRSA
864
        :param table_name: str, table name
865
        :type table_name: str
866
        :return: str
867
        """
868
        logger.debug(f"constructing query for {table_name}")
1✔
869
        db_name = self.get_db_name(table_name)
1✔
870
        nice_name = self.get_db_name(table_name, nice=True)
1✔
871
        id_key = 'cntr_mf' if 'allwise' in db_name else 'allwise_cntr'
1✔
872
        lum_keys = list()
1✔
873
        if mag:
1✔
874
            lum_keys += list(self.photometry_table_keymap[nice_name]['mag'].keys())
1✔
875
        if flux:
1✔
876
            lum_keys += list(self.photometry_table_keymap[nice_name]['flux'].keys())
1✔
877
        keys = ['ra', 'dec', 'mjd', id_key] + lum_keys
1✔
878
        _constraints = list(self.constraints)
1✔
879

880
        q = 'SELECT \n\t'
1✔
881
        for k in keys:
1✔
882
            q += f'{db_name}.{k}, '
1✔
883
        q += f'\n\tmine.{self._tap_orig_id_key} \n'
1✔
884
        q += f'FROM\n\tTAP_UPLOAD.ids AS mine \n'
1✔
885

886
        if query_type == 'positional':
1✔
887
            q += f'RIGHT JOIN\n\t{db_name} \n'
1✔
888
            radius = self.min_sep
1✔
889

890
        if query_type == 'by_allwise_id':
1✔
891
            q += f'INNER JOIN\n\t{db_name} ON {db_name}.{id_key} = mine.{self._tap_wise_id_key} \n'
1✔
892
            radius = 15 * u.arcsec
1✔
893

894
        q += 'WHERE \n'
1✔
895

896
        if query_type == 'positional':
1✔
897
            q += f"\tCONTAINS(POINT('J2000',{db_name}.ra,{db_name}.dec)," \
1✔
898
                 f"CIRCLE('J2000',mine.ra_in,mine.dec_in,{radius.to('deg').value}))=1 "
899

900
        if len(_constraints) > 0:
1✔
901

902
            if query_type == 'positional':
1✔
903
                q += ' AND (\n'
1✔
904

905
            for c in _constraints:
1✔
906
                q += f'\t{db_name}.{c} AND \n'
1✔
907
            q = q.strip(" AND \n")
1✔
908

909
            if query_type == 'positional':
1✔
910
                q += '\t)'
1✔
911

912
        logger.debug(f"\n{q}")
1✔
913
        return q
1✔
914

915
    def _submit_job_to_TAP(self, chunk_number, table_name, mag, flux, query_type):
1✔
916
        i = chunk_number
1✔
917
        t = table_name
1✔
918
        m = self.chunk_map == i
1✔
919

920
        # if perc is smaller than one select only a subset of wise IDs
921
        sel = self.parent_sample.df[np.array(m)]
1✔
922

923
        tab_d = dict()
1✔
924

925
        tab_d[self._tap_orig_id_key] = np.array(sel.index).astype(int)
1✔
926
        tab_d['ra_in'] = np.array(sel[self.parent_sample.default_keymap['ra']]).astype(float)
1✔
927
        tab_d['dec_in'] = np.array(sel[self.parent_sample.default_keymap['dec']]).astype(float)
1✔
928

929
        if query_type == 'by_allwise_id':
1✔
930
            tab_d[self._tap_wise_id_key] = np.array(sel[self.parent_wise_source_id_key]).astype(int)
1✔
931

932
        del sel
1✔
933

934
        logger.debug(f"{chunk_number}th query of {table_name}: uploading {len(list(tab_d.values())[0])} objects.")
1✔
935
        qstring = self._get_photometry_query_string(t, mag, flux, query_type)
1✔
936

937
        N_tries = 5
1✔
938
        while True:
1✔
939
            if N_tries == 0:
1✔
940
                logger.warning("No more tries left!")
×
941
                raise vo.dal.exceptions.DALServiceError(f"Submission failed "
×
942
                                                        f"for {i}th chunk "
943
                                                        f"of {t} "
944
                                                        f"after {N_tries} attempts")
945
            try:
1✔
946
                job = self.service.submit_job(qstring, uploads={'ids': Table(tab_d)})
1✔
947
                job.run()
1✔
948

949
                if isinstance(job.phase, type(None)):
1✔
950
                    raise vo.dal.DALServiceError(f"Job submission failed. No phase!")
×
951

952
                logger.info(f'submitted job for {t} for chunk {i}: ')
1✔
953
                logger.debug(f'Job: {job.url}; {job.phase}')
1✔
954
                self.tap_jobs[t][i] = job
1✔
955
                self.queue.put((t, i))
1✔
956
                break
1✔
957

958
            except (requests.exceptions.ConnectionError, vo.dal.exceptions.DALServiceError) as e:
×
959
                wait = 60
×
960
                N_tries -= 1
×
961
                logger.warning(f"{chunk_number}th query of {table_name}: Could not submit TAP job!\n"
×
962
                               f"{e}. Waiting {wait}s and try again. {N_tries} tries left.")
963
                time.sleep(wait)
×
964

965
    def _chunk_photometry_cache_filename(self, table_nice_name, chunk_number, additional_neowise_query=False):
1✔
966
        table_name = self.get_db_name(table_nice_name)
1✔
967
        _additional_neowise_query = '_neowise_gator' if additional_neowise_query else ''
1✔
968
        fn = f"{self._cached_raw_photometry_prefix}_{table_name}{_additional_neowise_query}" \
1✔
969
             f"{self._split_chunk_key}{chunk_number}.csv"
970
        return os.path.join(self._cache_photometry_dir, fn)
1✔
971

972
    @staticmethod
1✔
973
    def _give_up_tap(e):
1✔
974
        return ("Job is not active!" in str(e))
×
975

976
    @backoff.on_exception(
1✔
977
        backoff.expo,
978
        vo.dal.exceptions.DALServiceError,
979
        giveup=_give_up_tap,
980
        max_tries=50,
981
        on_backoff=backoff_hndlr
982
    )
983
    def _thread_wait_and_get_results(self, t, i):
1✔
984
        logger.info(f"Waiting on {i}th query of {t} ........")
1✔
985

986
        _job = self.tap_jobs[t][i]
1✔
987
        _job.wait()
1✔
988
        logger.info(f'{i}th query of {t}: Done!')
1✔
989

990
        lightcurve = _job.fetch_result().to_table().to_pandas()
1✔
991
        fn = self._chunk_photometry_cache_filename(t, i)
1✔
992
        logger.debug(f"{i}th query of {t}: saving under {fn}")
1✔
993

994
        table_nice_name = self.get_db_name(t, nice=True)
1✔
995
        cols = dict(self.photometry_table_keymap[table_nice_name]['mag'])
1✔
996
        cols.update(self.photometry_table_keymap[table_nice_name]['flux'])
1✔
997

998
        if 'allwise' in t:
1✔
999
            cols['cntr_mf'] = 'allwise_cntr'
×
1000

1001
        lightcurve.rename(columns=cols).to_csv(fn)
1✔
1002
        return
1✔
1003

1004
    def _tap_photometry_worker_thread(self):
1✔
1005
        while True:
1✔
1006
            try:
1✔
1007
                t, i = self.queue.get(block=False)
1✔
1008
            except queue.Empty:
1✔
1009
                logger.debug("No more tasks, exiting")
1✔
1010
                break
1✔
1011
            except AttributeError:
×
1012
                logger.debug(f"No more queue. exiting")
×
1013
                break
×
1014

1015
            job = self.tap_jobs[t][i]
1✔
1016

1017
            _ntries = 10
1✔
1018
            while True:
1✔
1019
                try:
1✔
1020
                    job._update(timeout=600)
1✔
1021
                    phase = job._job.phase
1✔
1022
                    break
1✔
1023
                except vo.dal.exceptions.DALServiceError as e:
×
1024
                    msg = f"{i}th query of {t}: DALServiceError: {e}; trying again in 6 min"
×
1025
                    if _ntries < 10:
×
1026
                        msg += f' ({_ntries} tries left)'
×
1027

1028
                    logger.warning(msg)
×
1029
                    time.sleep(60 * 6)
×
1030
                    if '404 Client Error: Not Found for url' in str(e):
×
1031
                        _ntries -= 1
×
1032

1033
            if phase in self.running_tap_phases:
1✔
1034
                self.queue.put((t, i))
×
1035
                self.queue.task_done()
×
1036

1037
            elif phase in self.done_tap_phases:
1✔
1038
                self._thread_wait_and_get_results(t, i)
1✔
1039
                self.queue.task_done()
1✔
1040
                logger.info(f'{self.queue.qsize()} tasks left')
1✔
1041

1042
            else:
1043
                logger.warning(f'queue {i} of {t}: Job not active! Phase is {phase}')
×
1044

1045
            time.sleep(np.random.uniform(60))
1✔
1046

1047
        logger.debug("closing thread")
1✔
1048

1049
    def _run_tap_worker_threads(self, nthreads):
1✔
1050
        threads = [threading.Thread(target=self._tap_photometry_worker_thread)
1✔
1051
                   for _ in range(nthreads)]
1052

1053
        for t in threads:
1✔
1054
            t.start()
1✔
1055

1056
        try:
1✔
1057
            self.queue.join()
1✔
1058
        except KeyboardInterrupt:
×
1059
            pass
×
1060

1061
        logger.info('all tap_jobs done!')
1✔
1062
        for i, t in enumerate(threads):
1✔
1063
            logger.debug(f"{i}th thread alive: {t.is_alive()}")
1✔
1064

1065
        for t in threads:
1✔
1066
            t.join()
1✔
1067
        self.tap_jobs = None
1✔
1068
        del threads
1✔
1069

1070
    def _query_for_photometry(self, tables, chunks, wait, mag, flux, nthreads, query_type):
1✔
1071
        # ----------------------------------------------------------------------
1072
        #     Do the query
1073
        # ----------------------------------------------------------------------
1074
        self.tap_jobs = dict()
1✔
1075
        self.queue = queue.Queue()
1✔
1076
        tables = np.atleast_1d(tables)
1✔
1077

1078
        for t in tables:
1✔
1079
            self.tap_jobs[t] = dict()
1✔
1080
            for i in chunks:
1✔
1081
                self._submit_job_to_TAP(i, t, mag, flux, query_type)
1✔
1082
                time.sleep(5)
1✔
1083

1084
        logger.info(f'added {self.queue.qsize()} tasks to queue')
1✔
1085
        logger.info(f"wait for {wait} hours to give tap_jobs some time")
1✔
1086
        time.sleep(wait * 3600)
1✔
1087
        nthreads = min(len(tables) * len(chunks), nthreads)
1✔
1088
        self._run_tap_worker_threads(nthreads)
1✔
1089
        self.queue = None
1✔
1090

1091
    # ----------------------------------------------------------------------
1092
    #     select individual lightcurves and bin
1093
    # ----------------------------------------------------------------------
1094

1095
    def _select_individual_lightcurves_and_bin(self, ncpu=35, service='tap', chunks=None, mask_by_position=False):
1✔
1096
        logger.info('selecting individual lightcurves and bin ...')
1✔
1097
        ncpu = min(self.n_chunks, ncpu)
1✔
1098
        logger.debug(f"using {ncpu} CPUs")
1✔
1099
        chunk_list = list(range(self.n_chunks)) if not chunks else chunks
1✔
1100
        service_list = [service] * len(chunk_list)
1✔
1101
        jobID_list = [None] * len(chunk_list)
1✔
1102
        pos_mask_list = [mask_by_position] * len(chunk_list)
1✔
1103
        logger.debug(f"multiprocessing arguments: chunks: {chunk_list}, service: {service_list}")
1✔
1104

1105
        while True:
1✔
1106
            try:
1✔
1107
                logger.debug(f'trying with {ncpu}')
1✔
1108
                p = mp.Pool(ncpu)
1✔
1109
                break
1✔
1110
            except OSError as e:
×
1111
                logger.warning(e)
×
1112
                if ncpu == 1:
×
1113
                    break
×
1114
                ncpu = int(round(ncpu - 1))
×
1115

1116
        if ncpu > 1:
1✔
1117
            r = list(
1✔
1118
                tqdm.tqdm(
1119
                    p.starmap(
1120
                        self._subprocess_select_and_bin,
1121
                        zip(service_list, chunk_list, jobID_list, pos_mask_list)
1122
                    ),
1123
                    total=self.n_chunks,
1124
                    desc='select and bin'
1125
                )
1126
            )
1127
            p.close()
1✔
1128
            p.join()
1✔
1129
        else:
1130
            r = list(map(self._subprocess_select_and_bin, service_list, chunk_list, jobID_list, pos_mask_list))
1✔
1131

1132
    def get_unbinned_lightcurves(self, chunk_number, clear=False):
1✔
1133
        """
1134
        Get the unbinned lightcurves for a given chunk number.
1135

1136
        :param chunk_number: int
1137
        :type chunk_number: int
1138
        :param clear: remove files after loading, defaults to False
1139
        :type clear: bool, optional
1140
        """
1141
        # load only the files for this chunk
1142
        fns = [os.path.join(self._cache_photometry_dir, fn)
1✔
1143
               for fn in os.listdir(self._cache_photometry_dir)
1144
               if (fn.startswith(self._cached_raw_photometry_prefix) and fn.endswith(
1145
                f"{self._split_chunk_key}{chunk_number}.csv"
1146
            ))]
1147
        logger.debug(f"chunk {chunk_number}: loading {len(fns)} files for chunk {chunk_number}")
1✔
1148

1149
        if len(fns) == 0:
1✔
1150
            raise ValueError(f"No unbinned lightcurves found for chunk {chunk_number}!")
×
1151

1152
        lightcurves = pd.concat([pd.read_csv(fn) for fn in fns])
1✔
1153

1154
        if clear:
1✔
1155
            for fn in fns:
×
1156
                os.remove(fn)
×
1157

1158
        return lightcurves
1✔
1159

1160
    def _subprocess_select_and_bin(self, service, chunk_number=None, jobID=None, mask_by_position=False):
1✔
1161
        # run through the ids and bin the lightcurves
1162
        if service == 'tap':
1✔
1163
            lightcurves = self.get_unbinned_lightcurves(chunk_number, clear=self.clear_unbinned_photometry_when_binning)
1✔
1164
        elif service == 'gator':
1✔
1165
            lightcurves = self._get_unbinned_lightcurves_gator(
1✔
1166
                chunk_number,
1167
                clear=self.clear_unbinned_photometry_when_binning
1168
            )
1169
        else:
1170
            raise ValueError(f"Service {service} not known!")
×
1171

1172
        if jobID:
1✔
1173
            indices = np.where(self.cluster_jobID_map == jobID)[0]
1✔
1174
        else:
1175
            indices = lightcurves[self._tap_orig_id_key].unique()
1✔
1176

1177
        logger.debug(f"chunk {chunk_number}: going through {len(indices)} IDs")
1✔
1178

1179
        data_product = self.load_data_product(service=service, chunk_number=chunk_number, jobID=jobID)
1✔
1180

1181
        if data_product is None:
1✔
1182
            logger.info(f"Starting data product for {len(indices)} indices.")
1✔
1183
            data_product = self._start_data_product(parent_sample_indices=indices)
1✔
1184

1185
        if mask_by_position:
1✔
1186
            bad_indices = self.get_position_mask(service, chunk_number)
1✔
1187
        else:
1188
            bad_indices = None
1✔
1189

1190
        for parent_sample_entry_id in tqdm.tqdm(indices, desc="binning"):
1✔
1191
            m = lightcurves[self._tap_orig_id_key] == parent_sample_entry_id
1✔
1192
            lightcurve = lightcurves[m]
1✔
1193

1194
            if (bad_indices is not None) and (parent_sample_entry_id in bad_indices):
1✔
1195
                pos_m = ~lightcurve.index.isin(bad_indices)
×
1196
                lightcurve = lightcurve[pos_m]
×
1197

1198
            if len(lightcurve) < 1:
1✔
1199
                logger.warning(f"No data for {parent_sample_entry_id}")
×
1200
                continue
×
1201

1202
            binned_lc = self.bin_lightcurve(lightcurve)
1✔
1203
            data_product[str(int(parent_sample_entry_id))]["timewise_lightcurve"] = binned_lc.to_dict()
1✔
1204

1205
        logger.debug(f"chunk {chunk_number}: saving {len(data_product.keys())} binned lcs")
1✔
1206
        self._save_data_product(data_product, service=service, chunk_number=chunk_number, jobID=jobID, overwrite=True)
1✔
1207

1208
    # ---------------------------------------- #
1209
    # END using TAP to get photometry          #
1210
    # ----------------------------------------------------------------------------------- #
1211

1212
    # ----------------------------------------------------------------------
1213
    #     bin lightcurves
1214
    # ----------------------------------------------------------------------
1215

1216
    @abc.abstractmethod
1✔
1217
    def bin_lightcurve(self, lightcurve):
1✔
1218
        """
1219
        Bins a lightcurve
1220

1221
        :param lightcurve: The unbinned lightcurve
1222
        :type lightcurve: pandas.DataFrame
1223
        :return: the binned lightcurve
1224
        :rtype: pd.DataFrame
1225
        """
1226
        raise NotImplementedError
1227

1228
    # ----------------------------------------------------------------------
1229
    #     bin lightcurves
1230
    # ----------------------------------------------------------------------
1231

1232
    # ----------------------------------------------------------------------------------- #
1233
    # START converting to flux densities                   #
1234
    # ---------------------------------------------------- #
1235

1236
    def find_color_correction(self, w1_minus_w2):
1✔
1237
        """
1238
        Find the color correction based on the W1-W2 color.
1239
        See `this <https://wise2.ipac.caltech.edu/docs/release/allsky/expsup/sec4_4h.html#conv2flux>`_
1240

1241
        :param w1_minus_w2:
1242
        :type w1_minus_w2: float
1243
        :return: the color correction factor
1244
        :rtype: float
1245
        """
1246
        w1_minus_w2 = np.atleast_1d(w1_minus_w2)
×
1247
        c = pd.DataFrame(columns=self.magnitude_zeropoints_corrections.columns)
×
1248
        power_law_values = self.magnitude_zeropoints_corrections.loc[8:16]['[W1 - W2]']
×
1249
        for w1mw2 in w1_minus_w2:
×
1250
            dif = power_law_values - w1mw2
×
1251
            i = abs(dif).argmin()
×
1252
            c = c.append(self.magnitude_zeropoints_corrections.loc[i])
×
1253
        return c
×
1254

1255
    def vegamag_to_flux_density(self, vegamag, band, unit='mJy', color_correction=None):
1✔
1256
        """
1257
        This converts the detector level brightness m in Mag_vega to a flux density F
1258

1259
                    F = (F_nu / f_c) * 10 ^ (-m / 2.5)
1260

1261
        where F_nu is the zeropoint flux for the corresponding band and f_c a color correction factor.
1262
        See `this <https://wise2.ipac.caltech.edu/docs/release/allsky/expsup/sec4_4h.html#conv2flux>`_
1263

1264
        :param vegamag:
1265
        :type vegamag: float or numpy.ndarray
1266
        :param band:
1267
        :type band: str
1268
        :param unit: unit to convert the flux density to
1269
        :type unit: str
1270
        :param color_correction: the colorcorection factor, if dict the keys have to be 'f_c("band")'
1271
        :type color_correction: float or numpy.ndarray or dict
1272
        :return: the flux densities
1273
        :rtype: ndarray
1274
        """
1275
        if not isinstance(color_correction, type(None)):
1✔
1276
            key = f'f_c({band})'
×
1277
            if key in color_correction:
×
1278
                color_correction = color_correction[key]
×
1279
                if len(color_correction) != len(vegamag):
×
1280
                    raise ValueError(f"\nLength of color corrections: {len(color_correction)}:\n{color_correction}; "
×
1281
                                     f"\nLentgh of mags: {len(vegamag)}: \n{vegamag}")
1282
            else:
1283
                raise NotImplementedError(color_correction)
1284

1285
        else:
1286
            color_correction = 1
1✔
1287

1288
        color_correction = np.array(color_correction)
1✔
1289
        vegamag = np.array(vegamag)
1✔
1290
        fd = self.magnitude_zeropoints['F_nu'][band].to(unit).value / color_correction * 10 ** (-vegamag / 2.5)
1✔
1291
        if len(fd) != len(vegamag):
1✔
1292
            raise ValueError(f"\nLength of flux densities: {len(fd)}:\n{fd}; "
×
1293
                             f"\nLentgh of mags: {len(vegamag)}: \n{vegamag}")
1294

1295
        return np.array(list(fd))
1✔
1296

1297
    def add_flux_density(self, lightcurve,
1✔
1298
                         mag_key, emag_key, mag_ul_key,
1299
                         f_key, ef_key, f_ul_key, do_color_correction=False):
1300
        """Adds flux densities to a lightcurves
1301

1302
        :param lightcurve:
1303
        :type lightcurve: pandas.DataFrame
1304
        :param mag_key: the key in `lightcurve` that holds the magnitude
1305
        :type mag_key: str
1306
        :param emag_key: the key in `lightcurve` that holds the error of the magnitude
1307
        :type emag_key: str
1308
        :param mag_ul_key: the key in `lightcurve` that holds the upper limit for the magnitude
1309
        :type mag_ul_key: str
1310
        :param f_key: the key that will hold the flux density
1311
        :type f_key: str
1312
        :param ef_key: the key that will hold the flux density error
1313
        :type ef_key: str
1314
        :param f_ul_key: the key that will hold the flux density upper limit
1315
        :type f_ul_key: str
1316
        :param do_color_correction:
1317
        :type do_color_correction: bool
1318
        :return: the lightcurve with flux density
1319
        :rtype: pandas.DataFrame
1320
        """
1321

1322
        if isinstance(lightcurve, dict):
1✔
1323
            lightcurve = pd.DataFrame.from_dict(lightcurve, orient='columns')
1✔
1324

1325
        if do_color_correction:
1✔
1326
            w1_minus_w2 = lightcurve[f"W1{mag_key}"] - lightcurve[f"W2{mag_key}"]
×
1327
            f_c = self.find_color_correction(w1_minus_w2)
×
1328
        else:
1329
            f_c = None
1✔
1330

1331
        for b in self.bands:
1✔
1332
            mags = lightcurve[f'{b}{mag_key}']
1✔
1333
            emags = lightcurve[f'{b}{emag_key}']
1✔
1334

1335
            flux_densities = self.vegamag_to_flux_density(mags, band=b)
1✔
1336
            upper_eflux_densities = self.vegamag_to_flux_density(mags - emags, band=b, color_correction=f_c)
1✔
1337
            lower_eflux_densities = self.vegamag_to_flux_density(mags + emags, band=b, color_correction=f_c)
1✔
1338
            eflux_densities = upper_eflux_densities - lower_eflux_densities
1✔
1339

1340
            lightcurve[f'{b}{f_key}']  = flux_densities
1✔
1341
            lightcurve[f'{b}{ef_key}'] = eflux_densities
1✔
1342
            if mag_ul_key:
1✔
1343
                lightcurve[f'{b}{f_ul_key}'] = lightcurve[f'{b}{mag_ul_key}']
1✔
1344

1345
        return lightcurve
1✔
1346

1347
    def add_flux_densities_to_saved_lightcurves(self, service):
1✔
1348
        """Adds flux densities to all downloaded lightcurves
1349

1350
        :param service: The service with which the lightcurves were downloaded
1351
        :type service: str
1352
        """
1353
        data_product = self.load_data_product(service=service)
1✔
1354
        for i, i_data_product in tqdm.tqdm(data_product.items(), desc='adding flux densities'):
1✔
1355
            data_product[i]["timewise_lightcurve"] = self.add_flux_density(
1✔
1356
                i_data_product["timewise_lightcurve"],
1357
                mag_key=f'{self.mean_key}{self.mag_key_ext}',
1358
                emag_key=f'{self.mag_key_ext}{self.rms_key}',
1359
                mag_ul_key=f'{self.mag_key_ext}{self.upper_limit_key}',
1360
                f_key=f'{self.mean_key}{self.flux_density_key_ext}',
1361
                ef_key=f'{self.flux_density_key_ext}{self.rms_key}',
1362
                f_ul_key=f'{self.flux_density_key_ext}{self.upper_limit_key}'
1363
            ).to_dict()
1364
        self._save_data_product(data_product, service=service, overwrite=True)
1✔
1365

1366
    # ---------------------------------------------------- #
1367
    # END converting to flux densities                     #
1368
    # ----------------------------------------------------------------------------------- #
1369

1370
    # ----------------------------------------------------------------------------------- #
1371
    # START converting to luminosity                       #
1372
    # ---------------------------------------------------- #
1373

1374
    def luminosity_from_flux_density(self, flux_density, band, distance=None, redshift=None,
1✔
1375
                                     unit='erg s-1', flux_density_unit='mJy'):
1376
        """
1377
        Converts a flux density into a luminosity
1378

1379
        :param flux_density:
1380
        :type flux_density: float or numpy.ndarray
1381
        :param band:
1382
        :type band: str
1383
        :param distance: distance to source, if not given will use luminosity distance from redshift
1384
        :type distance: astropy.Quantity
1385
        :param redshift: redshift to use when calculating luminosity distance
1386
        :type redshift: float
1387
        :param unit: unit in which to give the luminosity, default is erg s-1 sm-2
1388
        :type unit: str or astropy.unit
1389
        :param flux_density_unit: unit in which the flux density is given, default is mJy
1390
        :type flux_density_unit: str or astropy.unit
1391
        :return: the resulting luminosities
1392
        :rtype: float or ndarray
1393
        """
1394

1395
        if not distance:
1✔
1396
            if not redshift:
1✔
1397
                raise ValueError('Either redshift or distance has to be given!')
×
1398
            else:
1399
                distance = Planck18.luminosity_distance(float(redshift))
1✔
1400

1401
        F_nu = np.array(flux_density) * u.Unit(flux_density_unit) * 4 * np.pi * distance ** 2
1✔
1402
        nu = constants.c / self.band_wavelengths[band]
1✔
1403
        luminosity = F_nu * nu
1✔
1404
        return luminosity.to(unit).value
1✔
1405

1406
    def _add_luminosity(self, lightcurve, f_key, ef_key, f_ul_key, lum_key, elum_key, lum_ul_key, **lum_kwargs):
1✔
1407
        for band in self.bands:
1✔
1408
            fd = lightcurve[band + f_key]
1✔
1409
            fd_e = lightcurve[band + ef_key]
1✔
1410
            l = self.luminosity_from_flux_density(fd, band, **lum_kwargs)
1✔
1411
            el = self.luminosity_from_flux_density(fd_e, band, **lum_kwargs)
1✔
1412
            lightcurve[band + lum_key] = l
1✔
1413
            lightcurve[band + elum_key] = el
1✔
1414
            lightcurve[band + lum_ul_key] = lightcurve[band + f_ul_key]
1✔
1415
        return lightcurve
1✔
1416

1417
    def add_luminosity_to_saved_lightcurves(self, service, redshift_key=None, distance_key=None):
1✔
1418
        """Add luminosities to all lightcurves, calculated from flux densities and distance or redshift
1419

1420
        :param service: the service with which the lightcurves were downloaded
1421
        :type service: str
1422
        :param redshift_key: the key in the parent sample data frame that holds the redshift info
1423
        :type redshift_key: str
1424
        :param distance_key: the key in the parent sample data frame that holds the distance info
1425
        :type distance_key: str
1426
        """
1427

1428
        if (not redshift_key) and (not distance_key):
1✔
1429
            raise ValueError('Either distance key or redshift key has to be given!')
×
1430

1431
        data_product = self.load_data_product(service=service)
1✔
1432
        for i, i_data_product in tqdm.tqdm(data_product.items(), desc='adding luminosities'):
1✔
1433
            parent_sample_idx = int(i.split('_')[0])
1✔
1434
            info = self.parent_sample.df.loc[parent_sample_idx]
1✔
1435

1436
            if distance_key:
1✔
1437
                distance = info[distance_key]
×
1438
                redshift = None
×
1439
            else:
1440
                distance = None
1✔
1441
                redshift = info[redshift_key]
1✔
1442

1443
            data_product[i]["timewise_lightcurve"] = self._add_luminosity(
1✔
1444
                pd.DataFrame.from_dict(i_data_product["timewise_lightcurve"]),
1445
                f_key     = self.mean_key + self.flux_density_key_ext,
1446
                ef_key    = self.flux_density_key_ext + self.rms_key,
1447
                f_ul_key  = self.flux_density_key_ext + self.upper_limit_key,
1448
                lum_key   = self.mean_key + self.luminosity_key_ext,
1449
                elum_key  = self.luminosity_key_ext + self.rms_key,
1450
                lum_ul_key= self.luminosity_key_ext + self.upper_limit_key,
1451
                redshift  = redshift,
1452
                distance  = distance
1453
            ).to_dict()
1454
        self._save_data_product(data_product, service=service, overwrite=True)
1✔
1455

1456
    # ---------------------------------------------------- #
1457
    # END converting to luminosity                         #
1458
    # ----------------------------------------------------------------------------------- #
1459

1460
    #################################
1461
    # END GET PHOTOMETRY DATA       #
1462
    ###########################################################################################################
1463

1464
    ###########################################################################################################
1465
    # START MAKE POSITIONAL MASK        #
1466
    #####################################
1467

1468
    @staticmethod
1✔
1469
    def calculate_position_mask(lightcurve):
1✔
1470
        """
1471
        Create a mask that based on the position of the single exposures.
1472
        Find the median position and find the 90%-quantile of datapoints from that.
1473
        Then, calculate the standard deviation of the separation from the median position and keep
1474
        all datapoints within five times that.
1475

1476
        :param lightcurve: unstacked lightcurve
1477
        :type lightcurve: pd.DataFrame
1478
        :return: positional mask
1479
        :rtype: np.ndarray
1480
        """
1481
        coords = SkyCoord(lightcurve.ra, lightcurve.dec, unit="deg")
1✔
1482

1483
        # we can use the standard median for Dec
1484
        med_offset_dec = np.median(coords.dec)
1✔
1485
        # We have to do a weighted median for RA
1486
        w = np.sin(np.deg2rad(coords.dec)) ** 2
1✔
1487
        sort_inds = np.argsort(coords.ra)
1✔
1488
        cum_w = np.cumsum(w[sort_inds])
1✔
1489
        cutoff = np.sum(w) / 2
1✔
1490
        med_offset_ra = coords.ra[sort_inds][cum_w >= cutoff][0]
1✔
1491
        med_pos = SkyCoord(med_offset_ra, med_offset_dec)
1✔
1492

1493
        # find the 90% closest datapoints
1494
        sep = coords.separation(med_pos)
1✔
1495
        sep90 = np.quantile(sep, 0.9)
1✔
1496
        sep_mask = sep < sep90
1✔
1497

1498
        # keep datapoints within 3 sigma
1499
        sig = max([np.std(sep[sep_mask]), 0.2*u.arcsec])
1✔
1500
        keep_mask = pd.Series(sep <= 5 * sig)
1✔
1501
        bad_indices = lightcurve.index[~keep_mask]
1✔
1502
        return list(bad_indices)
1✔
1503

1504
    def get_position_mask(self, service, chunk_number):
1✔
1505
        """
1506
        Get the position mask for a chunk
1507

1508
        :param service: The service that was used to download the data, either of `gator` or `tap`
1509
        :type service: str
1510
        :param chunk_number: chunk number
1511
        :type chunk_number: int
1512
        :returns: position masks
1513
        :rtype: dict
1514
        """
1515

1516
        logger.info(f"getting position masks for {service}, chunk {chunk_number}")
1✔
1517
        fn = os.path.join(self.cache_dir, "position_masks", f"{service}_chunk{chunk_number}.json")
1✔
1518

1519
        if not os.path.isfile(fn):
1✔
1520
            logger.debug(f"No file {fn}. Calculating position masks.")
1✔
1521

1522
            if service == "tap":
1✔
1523
                unbinned_lcs = self.get_unbinned_lightcurves(chunk_number)
1✔
1524
            elif service == "gator":
1✔
1525
                unbinned_lcs = self._get_unbinned_lightcurves_gator(chunk_number)
1✔
1526
            else:
1527
                raise ValueError(f"Service must be one of 'gator' or 'tap', not {service}!")
×
1528

1529
            position_masks = dict()
1✔
1530

1531
            for i in unbinned_lcs[self._tap_orig_id_key].unique():
1✔
1532
                lightcurve = unbinned_lcs[unbinned_lcs[self._tap_orig_id_key] == i]
1✔
1533
                bad_indices = self.calculate_position_mask(lightcurve)
1✔
1534
                if len(bad_indices) > 0:
1✔
1535
                    position_masks[str(i)] = bad_indices
1✔
1536

1537
            d = os.path.dirname(fn)
1✔
1538
            if not os.path.isdir(d):
1✔
1539
                os.makedirs(d, exist_ok=True)
1✔
1540

1541
            with open(fn, "w") as f:
1✔
1542
                json.dump(position_masks, f)
1✔
1543

1544
        else:
1545
            logger.debug(f"loading {fn}")
1✔
1546
            with open(fn, "r") as f:
1✔
1547
                position_masks = json.load(f)
1✔
1548

1549
        return position_masks
1✔
1550

1551
    #####################################
1552
    # END MAKE POSITIONAL MASK          #
1553
    ###########################################################################################################
1554

1555
    ###########################################################################################################
1556
    # START MAKE PLOTTING FUNCTIONS     #
1557
    #####################################
1558

1559
    def plot_lc(self, parent_sample_idx, service='tap', plot_unbinned=False, plot_binned=True,
1✔
1560
                interactive=False, fn=None, ax=None, save=True, lum_key='flux_density', **kwargs):
1561
        """Make a pretty plot of a lightcurve
1562

1563
        :param parent_sample_idx: The index in the parent sample of the lightcurve
1564
        :type parent_sample_idx: int
1565
        :param service: the service with which the lightcurves were downloaded
1566
        :type service: str
1567
        :param plot_unbinned: plot unbinned data
1568
        :type plot_unbinned: bool
1569
        :param plot_binned: plot binned lightcurve
1570
        :type plot_binned: bool
1571
        :param interactive: interactive mode
1572
        :type interactive: bool
1573
        :param fn: filename, defaults to </path/to/timewise/data/dir>/output/plots/<base_name>/<parent_sample_index>_<lum_key>.pdf
1574
        :type fn: str
1575
        :param ax: pre-existing matplotlib.Axis
1576
        :param save: save the plot
1577
        :type save: bool
1578
        :param lum_key: the unit of luminosity to use in the plot, either of 'mag', 'flux_density' or 'luminosity'
1579
        :param kwargs: any additional kwargs will be passed on to `matplotlib.pyplot.subplots()`
1580
        :return: the matplotlib.Figure and matplotlib.Axes if `interactive=True`
1581
        """
1582

1583
        logger.debug(f"loading binned lightcurves")
1✔
1584
        data_product = self.load_data_product(service)
1✔
1585
        _get_unbinned_lcs_fct = self.get_unbinned_lightcurves if service == 'tap' else self._get_unbinned_lightcurves_gator
1✔
1586

1587
        wise_id = self.parent_sample.df.loc[int(parent_sample_idx), self.parent_wise_source_id_key]
1✔
1588
        if isinstance(wise_id, float) and not np.isnan(wise_id):
1✔
1589
            wise_id = int(wise_id)
×
1590
        logger.debug(f"{wise_id} for {parent_sample_idx}")
1✔
1591

1592
        lc = pd.DataFrame.from_dict(data_product[str(int(parent_sample_idx))]["timewise_lightcurve"])
1✔
1593

1594
        if plot_unbinned:
1✔
1595
            _chunk_number = self._get_chunk_number(parent_sample_index=parent_sample_idx)
1✔
1596

1597
            if service == 'tap':
1✔
1598
                unbinned_lcs = self.get_unbinned_lightcurves(_chunk_number)
1✔
1599

1600
            else:
1601
                unbinned_lcs = self._get_unbinned_lightcurves_gator(_chunk_number)
1✔
1602

1603
            unbinned_lc = unbinned_lcs[unbinned_lcs[self._tap_orig_id_key] == int(parent_sample_idx)]
1✔
1604

1605
        else:
1606
            unbinned_lc = None
1✔
1607

1608
        _lc = lc if plot_binned else None
1✔
1609

1610
        if not fn:
1✔
1611
            fn = os.path.join(self.plots_dir, f"{parent_sample_idx}_{lum_key}.pdf")
1✔
1612

1613
        return self._plot_lc(lightcurve=_lc, unbinned_lc=unbinned_lc, interactive=interactive, fn=fn, ax=ax,
1✔
1614
                             save=save, lum_key=lum_key, **kwargs)
1615

1616
    def _plot_lc(self, lightcurve=None, unbinned_lc=None, interactive=False, fn=None, ax=None, save=True,
1✔
1617
                 lum_key='flux_density', **kwargs):
1618

1619
        if not ax:
1✔
1620
            fig, ax = plt.subplots(**kwargs)
1✔
1621
        else:
1622
            fig = plt.gcf()
1✔
1623

1624
        for b in self.bands:
1✔
1625
            try:
1✔
1626
                if not isinstance(lightcurve, type(None)):
1✔
1627
                    ul_mask = np.array(lightcurve[f"{b}_{lum_key}{self.upper_limit_key}"]).astype(bool)
1✔
1628
                    ax.errorbar(lightcurve.mean_mjd[~ul_mask], lightcurve[f"{b}{self.mean_key}_{lum_key}"][~ul_mask],
1✔
1629
                                yerr=lightcurve[f"{b}_{lum_key}{self.rms_key}"][~ul_mask],
1630
                                label=b, ls='', marker='s', c=self.band_plot_colors[b], markersize=4,
1631
                                markeredgecolor='k', ecolor='k', capsize=2)
1632
                    ax.scatter(lightcurve.mean_mjd[ul_mask], lightcurve[f"{b}{self.mean_key}_{lum_key}"][ul_mask],
1✔
1633
                               marker='v', c=self.band_plot_colors[b], alpha=0.7, s=2)
1634

1635
                if not isinstance(unbinned_lc, type(None)):
1✔
1636
                    m = ~unbinned_lc[f"{b}_{lum_key}"].isna()
1✔
1637
                    ul_mask = unbinned_lc[f"{b}_{lum_key}{self.error_key_ext}"].isna()
1✔
1638

1639
                    tot_m = m & ~ul_mask
1✔
1640
                    if np.any(tot_m):
1✔
1641
                        ax.errorbar(unbinned_lc.mjd[tot_m], unbinned_lc[f"{b}_{lum_key}"][tot_m],
1✔
1642
                                    yerr=unbinned_lc[f"{b}_{lum_key}{self.error_key_ext}"][tot_m],
1643
                                    label=f"{b} unbinned", ls='', marker='o', c=self.band_plot_colors[b], markersize=4,
1644
                                    alpha=0.3)
1645

1646
                    single_ul_m = m & ul_mask
1✔
1647
                    if np.any(single_ul_m):
1✔
1648
                        label = f"{b} unbinned upper limits" if not np.any(tot_m) else ""
×
1649
                        ax.scatter(unbinned_lc.mjd[single_ul_m], unbinned_lc[f"{b}_{lum_key}"][single_ul_m],
×
1650
                                   marker="d", c=self.band_plot_colors[b], alpha=0.3, s=1, label=label)
1651

1652
            except KeyError as e:
×
1653
                raise KeyError(f"Could not find brightness key {e}!")
×
1654

1655
        if lum_key == 'mag':
1✔
1656
            ylim = ax.get_ylim()
1✔
1657
            ax.set_ylim([ylim[-1], ylim[0]])
1✔
1658

1659
        ax.set_xlabel('MJD')
1✔
1660
        ax.set_ylabel(lum_key)
1✔
1661
        ax.legend()
1✔
1662

1663
        if save:
1✔
1664
            logger.debug(f"saving under {fn}")
1✔
1665
            fig.savefig(fn)
1✔
1666

1667
        if interactive:
1✔
1668
            return fig, ax
×
1669
        else:
1670
            plt.close()
1✔
1671

1672
    #####################################
1673
    #  END MAKE PLOTTING FUNCTIONS      #
1674
    ###########################################################################################################
1675

1676
    ###########################################################################################################
1677
    #  START CALCULATE METADATA         #
1678
    #####################################
1679

1680
    def calculate_metadata(self, service, chunk_number=None, jobID=None, overwrite=True):
1✔
1681
        """Calculates the metadata for all downloaded lightcurves.
1682
         Results will be saved under
1683

1684
            </path/to/timewise/data/dir>/output/<base_name>/lightcurves/metadata_<service>.json
1685

1686
        :param service: the service with which the lightcurves were downloaded
1687
        :type service: str
1688
        :param chunk_number: the chunk number to use, default uses all chunks
1689
        :type chunk_number: int
1690
        :param jobID:  the job ID to use, default uses all lightcurves
1691
        :type jobID: int
1692
        :param overwrite: overwrite existing metadata file
1693
        :type overwrite: bool
1694
        """
1695
        data_product = self.load_data_product(service, chunk_number, jobID)
1✔
1696
        for ID, i_data_product in tqdm.tqdm(data_product.items(), desc="calculating metadata"):
1✔
1697
            if "timewise_lightcurve" in i_data_product:
1✔
1698
                lc = pd.DataFrame.from_dict(i_data_product["timewise_lightcurve"])
1✔
1699
                metadata = self.calculate_metadata_single(lc)
1✔
1700
                data_product[ID]["timewise_metadata"] = metadata
1✔
1701

1702
        self._save_data_product(data_product, service, chunk_number, jobID, overwrite=overwrite)
1✔
1703

1704
    @abc.abstractmethod
1✔
1705
    def calculate_metadata_single(self, lcs):
1✔
1706
        """
1707
        Calculates some properties of the lightcurves
1708

1709
        :param lcs: the lightcurve
1710
        :type lcs: pandas.DataFrame
1711
        """
1712
        raise NotImplementedError
1713

1714
    #####################################
1715
    #  END CALCULATE METADATA           #
1716
    ###########################################################################################################
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc