• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

HEPData / hepdata / 18575002960

16 Oct 2025 09:15PM UTC coverage: 84.457% (-0.006%) from 84.463%
18575002960

Pull #925

github

GraemeWatt
opensearch: endpoint URLs should start with 'http'

* Allow case where some analyses included locally and some via endpoint.
Pull Request #925: Add support for HS3 and SimpleAnalysis resource files

15 of 15 new or added lines in 4 files covered. (100.0%)

2 existing lines in 1 file now uncovered.

4684 of 5546 relevant lines covered (84.46%)

0.84 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.62
/hepdata/modules/records/utils/analyses.py
1
# -*- coding: utf-8 -*-
2
#
3
# This file is part of HEPData.
4
# Copyright (C) 2021 CERN.
5
#
6
# HEPData is free software; you can redistribute it
7
# and/or modify it under the terms of the GNU General Public License as
8
# published by the Free Software Foundation; either version 2 of the
9
# License, or (at your option) any later version.
10
#
11
# HEPData is distributed in the hope that it will be
12
# useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with HEPData; if not, write to the
18
# Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston,
19
# MA 02111-1307, USA.
20
#
21
# In applying this license, CERN does not
22
# waive the privileges and immunities granted to it by virtue of its status
23
# as an Intergovernmental Organization or submit itself to any jurisdiction.
24

25
import logging
1✔
26
import os
1✔
27

28
from celery import shared_task
1✔
29
from flask import current_app
1✔
30
from invenio_db import db
1✔
31
from hepdata.resilient_requests import resilient_requests
1✔
32
import json
1✔
33
import jsonschema
1✔
34

35
from hepdata.ext.opensearch.api import index_record_ids
1✔
36
from hepdata.modules.submission.api import get_latest_hepsubmission, is_resource_added_to_submission
1✔
37
from hepdata.modules.submission.models import DataResource, HEPSubmission, data_reference_link
1✔
38
from hepdata.utils.users import get_user_from_id
1✔
39
from hepdata.modules.records.subscribers.rest import subscribe
1✔
40
from hepdata.modules.records.subscribers.api import is_current_user_subscribed_to_record
1✔
41
from hepdata.modules.records.utils.common import get_license
1✔
42

43
logging.basicConfig()
1✔
44
log = logging.getLogger(__name__)
1✔
45

46
def test_analyses_schema(json_file, schema_version="1.0.0"):
1✔
47
    schema_path = os.path.join("hepdata", "templates", "analyses_schema", schema_version, "analyses_schema.json")
1✔
48
    with open(schema_path) as f:
1✔
49
        schema = json.load(f)
1✔
50
    jsonschema.validate(instance=json_file, schema=schema)
1✔
51

52
@shared_task
1✔
53
def update_analyses(endpoint=None):
1✔
54
    """
55
    Update tools (Rivet, MadAnalysis 5, etc.) analyses and remove outdated resources.
56
    Allow bulk subscription to record update notifications if "subscribe_user_id" in endpoint.
57
    Add optional "description" and "license" fields if present in endpoint.
58

59
    :param endpoint: any one from config.ANALYSES_ENDPOINTS ("rivet", "MadAnalysis", etc.) or None (default) for all
60
    """
61

62
    endpoints = current_app.config["ANALYSES_ENDPOINTS"]
1✔
63
    for analysis_endpoint in endpoints:
1✔
64

65
        if endpoint and endpoint != analysis_endpoint:
1✔
66
            continue
1✔
67

68
        if "endpoint_url" in endpoints[analysis_endpoint]:
1✔
69

70
            log.info("Updating analyses from {0}...".format(analysis_endpoint))
1✔
71

72
            response = resilient_requests('get', endpoints[analysis_endpoint]["endpoint_url"])
1✔
73

74
            if response.ok:
1✔
75

76
                analysis_resources = DataResource.query.filter_by(file_type=analysis_endpoint).all()
1✔
77

78
                r_json = response.json()
1✔
79

80
                schema_version = "0.1.0"  # default to 0.1.0 for backward compatibility when schema_version field is missing
1✔
81
                if "schema_version" in r_json:
1✔
82
                    schema_version = r_json["schema_version"]
1✔
83

84
                # Validate analyses JSON file against the schema.
85
                try:
1✔
86
                    test_analyses_schema(r_json, schema_version=schema_version)
1✔
87
                except jsonschema.exceptions.ValidationError as e:
1✔
88
                    log.error("Validation error for analyses schema {0} in {1}: {2}".format(schema_version, analysis_endpoint, e))
1✔
89
                    continue
1✔
90

91
                if schema_version == "0.1.0":
1✔
92
                    analyses = r_json
1✔
93

94
                    # Check for missing analyses.
95
                    for record in analyses:
1✔
96
                        submission = get_latest_hepsubmission(inspire_id=record, overall_status='finished')
1✔
97

98
                        if submission:
1✔
99
                            num_new_resources = 0
1✔
100

101
                            for analysis in analyses[record]:
1✔
102
                                _resource_url = endpoints[analysis_endpoint]["url_template"].format(analysis)
1✔
103

104
                                if not is_resource_added_to_submission(submission.publication_recid, submission.version,
1✔
105
                                                                    _resource_url):
106

107
                                    log.info('Adding {} analysis to ins{} with URL {}'.format(
1✔
108
                                        analysis_endpoint, record, _resource_url)
109
                                    )
110
                                    new_resource = DataResource(
1✔
111
                                        file_location=_resource_url,
112
                                        file_type=analysis_endpoint)
113

114
                                    if "description" in endpoints[analysis_endpoint]:
1✔
115
                                        new_resource.file_description = str(endpoints[analysis_endpoint]["description"])
1✔
116

117
                                    if "license" in endpoints[analysis_endpoint]:
1✔
118
                                        resource_license = get_license(endpoints[analysis_endpoint]["license"])
1✔
119
                                        new_resource.file_license = resource_license.id
1✔
120

121
                                    submission.resources.append(new_resource)
1✔
122
                                    num_new_resources += 1
1✔
123

124
                                else:
125

126
                                    # Remove resources from 'analysis_resources' list.
127
                                    resources = list(filter(lambda a: a.file_location == _resource_url, analysis_resources))
1✔
128
                                    for resource in resources:
1✔
129
                                        analysis_resources.remove(resource)
1✔
130

131
                            if num_new_resources:
1✔
132

133
                                try:
1✔
134
                                    db.session.add(submission)
1✔
135
                                    db.session.commit()
1✔
136
                                    latest_submission = get_latest_hepsubmission(inspire_id=record)
1✔
137
                                    if submission.version == latest_submission.version:
1✔
138
                                        index_record_ids([submission.publication_recid])
1✔
139
                                except Exception as e:
×
140
                                    db.session.rollback()
×
141
                                    log.error(e)
×
142

143
                        else:
144
                            log.debug("An analysis is available in {0} but with no equivalent in HEPData (ins{1}).".format(
1✔
145
                                analysis_endpoint, record))
146

147
                else:  # schema_version >= "1.0.0"
148
                    # Check for missing analyses.
149
                    for ana in r_json["analyses"]:
1✔
150
                        inspire_id = str(ana["inspire_id"])  # inspire_id is stored as a string in the database
1✔
151
                        submission = get_latest_hepsubmission(inspire_id=inspire_id, overall_status='finished')
1✔
152

153
                        if submission:
1✔
154
                            num_new_resources = 0
1✔
155

156
                            for implementation in ana["implementations"]:
1✔
157
                                _resource_url = r_json["url_templates"]["main_url"].format(**implementation)
1✔
158

159
                                if not is_resource_added_to_submission(submission.publication_recid, submission.version,
1✔
160
                                                                    _resource_url):
161

162
                                    log.info('Adding {} analysis to ins{} with URL {}'.format(
1✔
163
                                        analysis_endpoint, inspire_id, _resource_url)
164
                                    )
165
                                    new_resource = DataResource(
1✔
166
                                        file_location=_resource_url,
167
                                        file_type=analysis_endpoint,
168
                                        file_description=r_json["implementations_description"]
169
                                    )
170

171
                                    if "implementations_license" in r_json:
1✔
172
                                        resource_license = get_license(r_json["implementations_license"])
1✔
173
                                        new_resource.file_license = resource_license.id
1✔
174

175
                                    submission.resources.append(new_resource)
1✔
176
                                    num_new_resources += 1
1✔
177

178
                                else:
179

180
                                    # Remove resources from 'analysis_resources' list.
181
                                    resources = list(filter(lambda a: a.file_location == _resource_url, analysis_resources))
1✔
182
                                    for resource in resources:
1✔
183
                                        analysis_resources.remove(resource)
1✔
184

185
                            if num_new_resources:
1✔
186

187
                                try:
1✔
188
                                    db.session.add(submission)
1✔
189
                                    db.session.commit()
1✔
190
                                    latest_submission = get_latest_hepsubmission(inspire_id=inspire_id)
1✔
191
                                    if submission.version == latest_submission.version:
1✔
192
                                        index_record_ids([submission.publication_recid])
1✔
193
                                except Exception as e:
×
194
                                    db.session.rollback()
×
195
                                    log.error(e)
×
196

197
                        else:
198
                            log.debug("An analysis is available in {0} but with no equivalent in HEPData (ins{1}).".format(
1✔
199
                                analysis_endpoint, inspire_id))
200

201
                if analysis_resources:
1✔
202
                    # Extra resources that were not found in the analyses JSON file.
203
                    # Need to delete extra resources then reindex affected submissions.
204
                    # Only take action if latest version is finished (most important case).
205
                    try:
1✔
206
                        recids_to_reindex = []
1✔
207
                        for extra_analysis_resource in analysis_resources:
1✔
208
                            query = db.select([data_reference_link.columns.submission_id]).where(
1✔
209
                                data_reference_link.columns.dataresource_id == extra_analysis_resource.id)
210
                            results = db.session.execute(query)
1✔
211
                            for result in results:
1✔
212
                                submission_id = result[0]
1✔
213
                            submission = HEPSubmission.query.filter_by(id=submission_id).first()
1✔
214
                            latest_submission = get_latest_hepsubmission(
1✔
215
                                publication_recid=submission.publication_recid, overall_status='finished'
216
                            )
217
                            if submission and latest_submission and submission.version == latest_submission.version:
1✔
218
                                log.info('Removing {} analysis with URL {} from submission {} version {}'
1✔
219
                                         .format(analysis_endpoint, extra_analysis_resource.file_location,
220
                                                 submission.publication_recid, submission.version))
221
                                db.session.delete(extra_analysis_resource)
1✔
222
                                recids_to_reindex.append(submission.publication_recid)
1✔
223
                        db.session.commit()
1✔
224
                        if recids_to_reindex:
1✔
225
                            unique_recids = list(set(recids_to_reindex))  # remove duplicates before indexing
1✔
226
                            # For large numbers of records, batch the indexing to reduce memory usage
227
                            batch_size = 100
1✔
228
                            for i in range(0, len(unique_recids), batch_size):
1✔
229
                                batch_recids = unique_recids[i:i+batch_size]
1✔
230
                                index_record_ids(batch_recids)
1✔
UNCOV
231
                    except Exception as e:
×
UNCOV
232
                        db.session.rollback()
×
233
                        log.error(e)
×
234

235
                # Allow bulk subscription to record update notifications.
236
                if "subscribe_user_id" in endpoints[analysis_endpoint]:
1✔
237
                    user = get_user_from_id(endpoints[analysis_endpoint]["subscribe_user_id"])
1✔
238
                    if user:
1✔
239
                        # Check for missing analyses.
240
                        if schema_version == "0.1.0":
1✔
241
                            for record in analyses:
1✔
242
                                submission = get_latest_hepsubmission(inspire_id=record, overall_status='finished')
1✔
243
                                if submission and not is_current_user_subscribed_to_record(submission.publication_recid, user):
1✔
244
                                    subscribe(submission.publication_recid, user)
1✔
245

246
                        else:  # schema_version >= "1.0.0"
247
                            for ana in r_json["analyses"]:
1✔
248
                                submission = get_latest_hepsubmission(inspire_id=str(ana["inspire_id"]), overall_status='finished')
1✔
249
                                if submission and not is_current_user_subscribed_to_record(submission.publication_recid, user):
1✔
250
                                    subscribe(submission.publication_recid, user)
1✔
251

252
            else:  # if not response.ok
253
                log.error(f"Error accessing {endpoints[analysis_endpoint]['endpoint_url']}")
1✔
254

255
        else:  # if "endpoint_url" not in endpoints[analysis_endpoint]
256
            log.debug("No endpoint_url configured for {0}".format(analysis_endpoint))
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc