• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

TOMToolkit / tom_base / 6713509976

31 Oct 2023 11:13PM UTC coverage: 86.773% (+0.7%) from 86.072%
6713509976

push

github-actions

web-flow
Merge pull request #699 from TOMToolkit/dev

Multi-Feature Merge. Please Review Carefully.

795 of 795 new or added lines in 39 files covered. (100.0%)

8253 of 9511 relevant lines covered (86.77%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

74.15
/tom_dataproducts/sharing.py
1
import requests
1✔
2
import os
1✔
3

4
from django.conf import settings
1✔
5
from django.core.exceptions import ImproperlyConfigured
1✔
6
from django.contrib import messages
1✔
7

8
from tom_targets.models import Target
1✔
9
from tom_dataproducts.models import DataProduct, ReducedDatum
1✔
10
from tom_dataproducts.alertstreams.hermes import publish_photometry_to_hermes, BuildHermesMessage, get_hermes_topics
1✔
11
from tom_dataproducts.serializers import DataProductSerializer, ReducedDatumSerializer
1✔
12

13

14
def share_data_with_hermes(share_destination, form_data, product_id=None, target_id=None, selected_data=None):
1✔
15
    """
16
    Serialize and share data with Hermes (hermes.lco.global)
17
    :param share_destination: Topic to share data to. (e.g. 'hermes.test')
18
    :param form_data: Sharing Form data
19
    :param product_id: DataProduct ID (if provided)
20
    :param target_id: Target ID (if provided)
21
    :param selected_data: List of ReducedDatum IDs (if provided)
22
    :return:
23
    """
24
    # Query relevant Reduced Datums Queryset
25
    accepted_data_types = ['photometry']
×
26
    if product_id:
×
27
        product = DataProduct.objects.get(pk=product_id)
×
28
        target = product.target
×
29
        reduced_datums = ReducedDatum.objects.filter(data_product=product)
×
30
    elif selected_data:
×
31
        reduced_datums = ReducedDatum.objects.filter(pk__in=selected_data)
×
32
        target = reduced_datums[0].target
×
33
    elif target_id:
×
34
        target = Target.objects.get(pk=target_id)
×
35
        data_type = form_data.get('data_type', 'photometry')
×
36
        reduced_datums = ReducedDatum.objects.filter(target=target, data_type=data_type)
×
37
    else:
38
        reduced_datums = ReducedDatum.objects.none()
×
39
        target = Target.objects.none()
×
40

41
    reduced_datums.filter(data_type__in=accepted_data_types)
×
42

43
    # Build and submit hermes table from Reduced Datums
44
    hermes_topic = share_destination.split(':')[1]
×
45
    destination = share_destination.split(':')[0]
×
46
    message_info = BuildHermesMessage(title=form_data.get('share_title',
×
47
                                                          f"Updated data for {target.name} from "
48
                                                          f"{getattr(settings, 'TOM_NAME', 'TOM Toolkit')}."),
49
                                      submitter=form_data.get('submitter'),
50
                                      authors=form_data.get('share_authors', None),
51
                                      message=form_data.get('share_message', None),
52
                                      topic=hermes_topic
53
                                      )
54
    # Run ReducedDatums Queryset through sharing protocols to make sure they are safe to share.
55
    filtered_reduced_datums = check_for_share_safe_datums(destination, reduced_datums, topic=hermes_topic)
×
56
    if filtered_reduced_datums.count() > 0:
×
57
        response = publish_photometry_to_hermes(message_info, filtered_reduced_datums)
×
58
    else:
59
        return {'message': f'ERROR: No valid data to share. (Check Sharing Protocol. Note that data types must be in '
×
60
                           f'{accepted_data_types})'}
61
    return response
×
62

63

64
def share_data_with_tom(share_destination, form_data, product_id=None, target_id=None, selected_data=None):
1✔
65
    """
66
    Serialize and share data with another TOM
67
    :param share_destination: TOM to share data to as described in settings.DATA_SHARING. (e.g. 'mytom')
68
    :param form_data: Sharing Form data
69
    :param product_id: DataProduct ID (if provided)
70
    :param target_id: Target ID (if provided)
71
    :param selected_data: List of ReducedDatum IDs (if provided)
72
    :return:
73
    """
74
    # Build destination TOM headers and URL information
75
    try:
1✔
76
        destination_tom_base_url = settings.DATA_SHARING[share_destination]['BASE_URL']
1✔
77
        username = settings.DATA_SHARING[share_destination]['USERNAME']
1✔
78
        password = settings.DATA_SHARING[share_destination]['PASSWORD']
1✔
79
    except KeyError as err:
×
80
        raise ImproperlyConfigured(f'Check DATA_SHARING configuration for {share_destination}: Key {err} not found.')
×
81
    auth = (username, password)
1✔
82
    headers = {'Content-Type': 'application/json', 'Accept': 'application/json'}
1✔
83

84
    dataproducts_url = destination_tom_base_url + 'api/dataproducts/'
1✔
85
    targets_url = destination_tom_base_url + 'api/targets/'
1✔
86
    reduced_datums_url = destination_tom_base_url + 'api/reduceddatums/'
1✔
87
    reduced_datums = ReducedDatum.objects.none()
1✔
88

89
    # If a DataProduct is provided, share that DataProduct
90
    if product_id:
1✔
91
        product = DataProduct.objects.get(pk=product_id)
1✔
92
        target = product.target
1✔
93
        serialized_data = DataProductSerializer(product).data
1✔
94
        # Find matching target in destination TOM
95
        destination_target_id, _ = get_destination_target(target, targets_url, headers, auth)
1✔
96
        if destination_target_id is None:
1✔
97
            return {'message': 'ERROR: No matching target found.'}
1✔
98
        elif isinstance(destination_target_id, list) and len(destination_target_id) > 1:
1✔
99
            return {'message': 'ERROR: Multiple targets with matching name found in destination TOM.'}
×
100
        serialized_data['target'] = destination_target_id
1✔
101
        # TODO: this should be updated when tom_dataproducts is updated to use django.core.storage
102
        dataproduct_filename = os.path.join(settings.MEDIA_ROOT, product.data.name)
1✔
103
        # Save DataProduct in Destination TOM
104
        with open(dataproduct_filename, 'rb') as dataproduct_filep:
1✔
105
            files = {'file': (product.data.name, dataproduct_filep, 'text/csv')}
1✔
106
            headers = {'Media-Type': 'multipart/form-data'}
1✔
107
            response = requests.post(dataproducts_url, data=serialized_data, files=files, headers=headers, auth=auth)
1✔
108
    elif selected_data or target_id:
1✔
109
        # If ReducedDatums are provided, share those ReducedDatums
110
        if selected_data:
1✔
111
            reduced_datums = ReducedDatum.objects.filter(pk__in=selected_data)
1✔
112
            targets = set(reduced_datum.target for reduced_datum in reduced_datums)
1✔
113
            target_dict = {}
1✔
114
            for target in targets:
1✔
115
                # get destination Target
116
                destination_target_id, _ = get_destination_target(target, targets_url, headers, auth)
1✔
117
                if isinstance(destination_target_id, list) and len(destination_target_id) > 1:
1✔
118
                    return {'message': 'ERROR: Multiple targets with matching name found in destination TOM.'}
×
119
                target_dict[target.name] = destination_target_id
1✔
120
            if all(value is None for value in target_dict.values()):
1✔
121
                return {'message': 'ERROR: No matching targets found.'}
1✔
122
        else:
123
            # If Target is provided, share all ReducedDatums for that Target
124
            # (Will not create New Target in Destination TOM)
125
            target = Target.objects.get(pk=target_id)
1✔
126
            reduced_datums = ReducedDatum.objects.filter(target=target)
1✔
127
            destination_target_id, _ = get_destination_target(target, targets_url, headers, auth)
1✔
128
            if destination_target_id is None:
1✔
129
                return {'message': 'ERROR: No matching target found.'}
1✔
130
            elif isinstance(destination_target_id, list) and len(destination_target_id) > 1:
1✔
131
                return {'message': 'ERROR: Multiple targets with matching name found in destination TOM.'}
×
132
            target_dict = {target.name:  destination_target_id}
1✔
133
        response_codes = []
1✔
134
        reduced_datums = check_for_share_safe_datums(share_destination, reduced_datums)
1✔
135
        if not reduced_datums:
1✔
136
            return {'message': 'ERROR: No valid data to share.'}
×
137
        for datum in reduced_datums:
1✔
138
            if target_dict[datum.target.name]:
1✔
139
                serialized_data = ReducedDatumSerializer(datum).data
1✔
140
                serialized_data['target'] = target_dict[datum.target.name]
1✔
141
                serialized_data['data_product'] = ''
1✔
142
                if not serialized_data['source_name']:
1✔
143
                    serialized_data['source_name'] = settings.TOM_NAME
1✔
144
                    serialized_data['source_location'] = f"ReducedDatum shared from " \
1✔
145
                                                         f"<{settings.TOM_NAME}.url>/api/reduceddatums/{datum.id}/"
146
                response = requests.post(reduced_datums_url, json=serialized_data, headers=headers, auth=auth)
1✔
147
                response_codes.append(response.status_code)
1✔
148
        failed_data_count = len([rc for rc in response_codes if rc >= 300])
1✔
149
        if failed_data_count < len(response_codes):
1✔
150
            return {'message': f'{len(response_codes)-failed_data_count} of {len(response_codes)} '
1✔
151
                               'datums successfully saved.'}
152
        else:
153
            return {'message': 'ERROR: No valid data shared. These data may already exist in target TOM.'}
1✔
154
    else:
155
        return {'message': 'ERROR: No valid data to share.'}
×
156

157
    return response
1✔
158

159

160
def get_destination_target(target, targets_url, headers, auth):
1✔
161
    """
162
    Retrieve the target ID from a destination TOM that is a fuzzy match the given target name and aliases
163
    :param target: Target Model
164
    :param targets_url: Destination API URL for TOM Target List
165
    :param headers: TOM API headers
166
    :param auth: TOM API authorization
167
    :return:
168
    """
169
    # Create coma separated list of target names plus aliases that can be recognized and parsed by the TOM API Filter
170
    target_names = ','.join(map(str, target.names))
1✔
171
    target_response = requests.get(f'{targets_url}?name_fuzzy={target_names}', headers=headers, auth=auth)
1✔
172
    target_response_json = target_response.json()
1✔
173
    try:
1✔
174
        if target_response_json['results']:
1✔
175
            if len(target_response_json['results']) > 1:
1✔
176
                return target_response_json['results'], target_response
1✔
177
            destination_target_id = target_response_json['results'][0]['id']
1✔
178
            return destination_target_id, target_response
1✔
179
        else:
180
            return None, target_response
1✔
181
    except KeyError:
1✔
182
        return None, target_response
1✔
183

184

185
def check_for_share_safe_datums(destination, reduced_datums, **kwargs):
1✔
186
    """
187
    Custom sharing protocols used to determine when data is shared with a destination.
188
    This example prevents sharing if a datum has already been published to the given Hermes topic.
189
    :param destination: sharing destination string
190
    :param reduced_datums: selected input datums
191
    :return: queryset of reduced datums to be shared
192
    """
193
    return reduced_datums
1✔
194
    # if 'hermes' in destination:
195
    #     message_topic = kwargs.get('topic', None)
196
    #     # Remove data points previously shared to the given topic
197
    #     filtered_datums = reduced_datums.exclude(Q(message__exchange_status='published')
198
    #                                              & Q(message__topic=message_topic))
199
    # else:
200
    #     filtered_datums = reduced_datums
201
    # return filtered_datums
202

203

204
def check_for_save_safe_datums():
1✔
205
    return
×
206

207

208
def get_sharing_destination_options():
1✔
209
    """
210
    Build the Display options and headers for the dropdown form for choosing sharing topics.
211
    Customize for a different selection experience.
212
    :return: Tuple: Possible Destinations and their Display Names
213
    """
214
    choices = []
1✔
215
    try:
1✔
216
        for destination, details in settings.DATA_SHARING.items():
1✔
217
            new_destination = [details.get('DISPLAY_NAME', destination)]
1✔
218
            if details.get('USER_TOPICS', None):
1✔
219
                # If topics exist for a destination (Such as HERMES) give topics as sub-choices
220
                #   for non-selectable Destination
221
                if destination == "hermes":
×
222
                    destination_topics = get_hermes_topics()
×
223
                else:
224
                    destination_topics = details['USER_TOPICS']
×
225
                topic_list = [(f'{destination}:{topic}', topic) for topic in destination_topics]
×
226
                new_destination.append(tuple(topic_list))
×
227
            else:
228
                # Otherwise just use destination as option
229
                new_destination.insert(0, destination)
1✔
230
            choices.append(tuple(new_destination))
1✔
231
    except AttributeError:
1✔
232
        pass
1✔
233
    return tuple(choices)
1✔
234

235

236
def sharing_feedback_handler(response, request):
1✔
237
    """
238
    Handle the response from a sharing request and prepare a message to the user
239
    :return:
240
    """
241
    try:
1✔
242
        if 'message' in response.json():
1✔
243
            publish_feedback = response.json()['message']
1✔
244
        else:
245
            publish_feedback = f"ERROR: {response.text}"
1✔
246
    except AttributeError:
1✔
247
        publish_feedback = response['message']
1✔
248
    except ValueError:
×
249
        publish_feedback = f"ERROR: Returned Response code {response.status_code}"
×
250
    if "ERROR" in publish_feedback.upper():
1✔
251
        messages.error(request, publish_feedback)
1✔
252
    else:
253
        messages.success(request, publish_feedback)
1✔
254
    return
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc