• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

TOMToolkit / tom_base / 6713509976

31 Oct 2023 11:13PM UTC coverage: 86.773% (+0.7%) from 86.072%
6713509976

push

github-actions

web-flow
Merge pull request #699 from TOMToolkit/dev

Multi-Feature Merge. Please Review Carefully.

795 of 795 new or added lines in 39 files covered. (100.0%)

8253 of 9511 relevant lines covered (86.77%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

14.06
/tom_dataproducts/alertstreams/hermes.py
1
import logging
1✔
2
from dateutil.parser import parse
1✔
3

4
from django.conf import settings
1✔
5

6
# from hop.io import Metadata
7

8
from tom_alerts.models import AlertStreamMessage
1✔
9
from tom_targets.models import Target, TargetList
1✔
10
from tom_dataproducts.models import ReducedDatum
1✔
11

12
import requests
1✔
13

14
logger = logging.getLogger(__name__)
1✔
15
logger.setLevel(logging.DEBUG)
1✔
16

17

18
class BuildHermesMessage(object):
1✔
19
    """
20
    A HERMES Message Object that can be submitted to HOP through HERMES
21
    """
22
    def __init__(self, title='', submitter='', authors='', message='', topic='hermes.test', **kwargs):
1✔
23
        self.title = title
×
24
        self.submitter = submitter
×
25
        self.authors = authors
×
26
        self.message = message
×
27
        self.topic = topic
×
28
        self.extra_info = kwargs
×
29

30

31
def publish_photometry_to_hermes(message_info, datums, **kwargs):
1✔
32
    """
33
    Submits a typical hermes photometry alert using the datums supplied to build a photometry table.
34
    -- Stores an AlertStreamMessage connected to each datum to show that the datum has previously been shared.
35
    :param message_info: HERMES Message Object created with BuildHermesMessage
36
    :param datums: Queryset of Reduced Datums to be built into table.
37
    :return: response
38
    """
39
    stream_base_url = settings.DATA_SHARING['hermes']['BASE_URL']
×
40
    submit_url = stream_base_url + 'api/v0/' + 'submit_message/'
×
41
    # You will need your Hermes API key. This can be found on your Hermes profile page.
42
    headers = {'Authorization': f"Token {settings.DATA_SHARING['hermes']['HERMES_API_KEY']}"}
×
43

44
    hermes_photometry_data = []
×
45
    hermes_target_list = []
×
46
    hermes_alert = AlertStreamMessage(topic=message_info.topic, exchange_status='published')
×
47
    hermes_alert.save()
×
48
    for tomtoolkit_photometry in datums:
×
49
        if tomtoolkit_photometry.target.name not in [target['name'] for target in hermes_target_list]:
×
50
            hermes_target_list.append(create_hermes_target_table_row(tomtoolkit_photometry.target, **kwargs))
×
51
        tomtoolkit_photometry.message.add(hermes_alert)
×
52
        hermes_photometry_data.append(create_hermes_phot_table_row(tomtoolkit_photometry, **kwargs))
×
53
    alert = {
×
54
        'topic': message_info.topic,
55
        'title': message_info.title,
56
        'submitter': message_info.submitter,
57
        'authors': message_info.authors,
58
        'data': {
59
            'targets': hermes_target_list,
60
            'photometry': hermes_photometry_data,
61
            'extra_data': message_info.extra_info
62
        },
63
        'message_text': message_info.message,
64
    }
65

66
    response = requests.post(url=submit_url, json=alert, headers=headers)
×
67
    return response
×
68

69

70
def create_hermes_target_table_row(target, **kwargs):
1✔
71
    """Build a row for a Hermes Target Table from a TOM target Model.
72
    """
73
    if target.type == "SIDEREAL":
×
74
        target_table_row = {
×
75
            'name': target.name,
76
            'ra': target.ra,
77
            'dec': target.dec,
78
        }
79
        if target.epoch:
×
80
            target_table_row['epoch'] = target.epoch
×
81
        if target.pm_ra:
×
82
            target_table_row['pm_ra'] = target.pm_ra
×
83
        if target.pm_dec:
×
84
            target_table_row['pm_dec'] = target.pm_dec
×
85
    else:
86
        target_table_row = {
×
87
            'name': target.name,
88
            'orbital_elements': {
89
                "epoch_of_elements": target.epoch_of_elements,
90
                "eccentricity": target.eccentricity,
91
                "argument_of_the_perihelion": target.arg_of_perihelion,
92
                "mean_anomaly": target.mean_anomaly,
93
                "orbital_inclination": target.inclination,
94
                "longitude_of_the_ascending_node": target.lng_asc_node,
95
                "semimajor_axis": target.semimajor_axis,
96
                "epoch_of_perihelion": target.epoch_of_perihelion,
97
                "perihelion_distance": target.perihdist,
98
            }
99
        }
100
    target_table_row['aliases'] = [alias.name for alias in target.aliases.all()]
×
101
    return target_table_row
×
102

103

104
def create_hermes_phot_table_row(datum, **kwargs):
1✔
105
    """Build a row for a Hermes Photometry Table using a TOM Photometry datum
106
    """
107
    phot_table_row = {
×
108
        'target_name': datum.target.name,
109
        'date_obs': datum.timestamp.isoformat(),
110
        'telescope': datum.value.get('telescope', ''),
111
        'instrument': datum.value.get('instrument', ''),
112
        'bandpass': datum.value.get('filter', ''),
113
        'brightness_unit': datum.value.get('unit', 'AB mag'),
114
    }
115
    if datum.value.get('magnitude', None):
×
116
        phot_table_row['brightness'] = datum.value['magnitude']
×
117
    else:
118
        phot_table_row['limiting_brightness'] = datum.value.get('limit', None)
×
119
    if datum.value.get('magnitude_error', None):
×
120
        phot_table_row['brightness_error'] = datum.value['magnitude_error']
×
121
    return phot_table_row
×
122

123

124
def get_hermes_topics(**kwargs):
1✔
125
    """
126
    Method to retrieve a list of available topics from HOP.
127
    Intended to be called from forms when building topic list.
128
    Extend this method to restrict topics for individual users.
129
    :return: List of writable topics available for TOM.
130
    """
131
    try:
×
132
        stream_base_url = settings.DATA_SHARING['hermes']['BASE_URL']
×
133
        submit_url = stream_base_url + "api/v0/profile/"
×
134
        headers = {'Authorization': f"Token {settings.DATA_SHARING['hermes']['HERMES_API_KEY']}"}
×
135

136
        response = requests.get(url=submit_url, headers=headers)
×
137

138
        topics = response.json()['writable_topics']
×
139
    except (KeyError, requests.exceptions.JSONDecodeError):
×
140
        topics = settings.DATA_SHARING['hermes']['USER_TOPICS']
×
141
    return topics
×
142

143

144
def hermes_alert_handler(alert, metadata):
1✔
145
    """Example Alert Handler to record data streamed through Hermes as a new ReducedDatum.
146
    -- Only Reads Photometry Data
147
    -- Only ingests Data if exact match for Target Name
148
    -- Does not Ingest Data if exact match already exists
149
    -- Requires 'tom_alertstreams' in settings.INSTALLED_APPS
150
    -- Requires ALERT_STREAMS['topic_handlers'] in settings
151
    """
152
    alert_as_dict = alert.content
×
153
    photometry_table = alert_as_dict['data'].get('photometry', None)
×
154
    # target_table = alert_as_dict['data'].get('targets', None)
155
    if photometry_table:
×
156
        hermes_alert = AlertStreamMessage(topic=alert_as_dict['topic'],
×
157
                                          exchange_status='ingested',
158
                                          message_id=alert_as_dict.get("uuid", None))
159
        target_name = ''
×
160
        query = []
×
161
        for row in photometry_table:
×
162
            if row['target_name'] != target_name:
×
163
                target_name = row['target_name']
×
164
                query = Target.matches.check_for_fuzzy_match(target_name)
×
165
            if query:
×
166
                target = query[0]
×
167
            else:
168
                # add conditional statements for whether to ingest a target here.
169
                # target = create_new_hermes_target(target_table, target_name, target_list_name="new_hermes_object")
170
                continue
×
171

172
            try:
×
173
                obs_date = parse(row['date_obs'])
×
174
            except ValueError:
×
175
                continue
×
176

177
            datum = {
×
178
                'target': target,
179
                'data_type': 'photometry',
180
                'source_name': alert_as_dict['topic'],
181
                'source_location': 'Hermes via HOP',  # TODO Add message URL here once message ID's exist
182
                'timestamp': obs_date,
183
                'value': get_hermes_phot_value(row)
184
            }
185
            new_rd, created = ReducedDatum.objects.get_or_create(**datum)
×
186
            if created:
×
187
                hermes_alert.save()
×
188
                new_rd.message.add(hermes_alert)
×
189
                new_rd.save()
×
190

191

192
def get_hermes_phot_value(phot_data):
1✔
193
    """
194
    Convert Hermes Message format for a row of Photometry table into parameters accepted by the Reduced Datum model
195
    :param phot_data: Dictionary containing Hermes Photometry table.
196
    :return: Dictionary containing properly formatted parameters for Reduced_Datum
197
    """
198
    data_dictionary = {
×
199
        'magnitude_error': phot_data.get('brightness_error', ''),
200
        'filter': phot_data['bandpass'],
201
        'telescope': phot_data.get('telescope', ''),
202
        'instrument': phot_data.get('instrument', ''),
203
        'unit': phot_data['brightness_unit'],
204
    }
205

206
    if phot_data.get('brightness', None):
×
207
        data_dictionary['magnitude'] = phot_data['brightness']
×
208
    elif phot_data.get('limiting_brightness', None):
×
209
        data_dictionary['limit'] = phot_data['limiting_brightness']
×
210

211
    return data_dictionary
×
212

213

214
def create_new_hermes_target(target_table, target_name=None, target_list_name=None):
1✔
215
    """
216
    Ingest a target into your TOM from Hermes.
217
    Takes a target_table and a target_name. If no target name is given, every target on the target table will be
218
    ingested.
219
    :param target_table: Hermes Target table from a Hermes Message
220
    :param target_name: Name for individual target to ingest from target table.
221
    :param target_list_name: Name of TargetList within which new target should be placed.
222
    :return:
223
    """
224
    target = None
×
225
    for hermes_target in target_table:
×
226
        if target_name == hermes_target['name'] or target_name is None:
×
227

228
            new_target = {"name": hermes_target.pop('name')}
×
229
            if "ra" in hermes_target and "dec" in hermes_target:
×
230
                new_target['type'] = 'SIDEREAL'
×
231
                new_target['ra'] = hermes_target.pop('ra')
×
232
                new_target['dec'] = hermes_target.pop('dec')
×
233
                new_target['pm_ra'] = hermes_target.pop('pm_ra', None)
×
234
                new_target['pm_dec'] = hermes_target.pop('pm_dec', None)
×
235
                new_target['epoch'] = hermes_target.pop('epoch', None)
×
236
            elif "orbital_elements" in hermes_target:
×
237
                orbital_elements = hermes_target.pop('orbital_elements')
×
238
                new_target['type'] = 'NON_SIDEREAL'
×
239
                new_target['epoch_of_elements'] = orbital_elements.pop('epoch_of_elements', None)
×
240
                new_target['mean_anomaly'] = orbital_elements.pop('mean_anomaly', None)
×
241
                new_target['arg_of_perihelion'] = orbital_elements.pop('argument_of_the_perihelion', None)
×
242
                new_target['eccentricity'] = orbital_elements.pop('eccentricity', None)
×
243
                new_target['lng_asc_node'] = orbital_elements.pop('longitude_of_the_ascending_node', None)
×
244
                new_target['inclination'] = orbital_elements.pop('orbital_inclination', None)
×
245
                new_target['semimajor_axis'] = orbital_elements.pop('semimajor_axis', None)
×
246
                new_target['epoch_of_perihelion'] = orbital_elements.pop('epoch_of_perihelion', None)
×
247
                new_target['perihdist'] = orbital_elements.pop('perihelion_distance', None)
×
248
            aliases = hermes_target.pop('aliases', [])
×
249
            target = Target(**new_target)
×
250
            target.full_clean()
×
251
            target.save(names=aliases, extras=hermes_target)
×
252
            if target_list_name:
×
253
                target_list, created = TargetList.objects.get_or_create(name=target_list_name)
×
254
                if created:
×
255
                    logger.debug(f'New target_list created: {target_list_name}')
×
256
                target_list.targets.add(target)
×
257
    return target
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc