• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

HEPData / hepdata / 21246856953

22 Jan 2026 11:32AM UTC coverage: 84.743% (+0.07%) from 84.673%
21246856953

Pull #898

github

web-flow
Merge 1a16115c6 into 7db713a49
Pull Request #898: Observer role

145 of 182 new or added lines in 8 files covered. (79.67%)

194 existing lines in 3 files now uncovered.

4860 of 5735 relevant lines covered (84.74%)

0.85 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

59.41
/hepdata/modules/permissions/api.py
1
# This file is part of HEPData.
2
# Copyright (C) 2016 CERN.
3
#
4
# HEPData is free software; you can redistribute it
5
# and/or modify it under the terms of the GNU General Public License as
6
# published by the Free Software Foundation; either version 2 of the
7
# License, or (at your option) any later version.
8
#
9
# HEPData is distributed in the hope that it will be
10
# useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12
# General Public License for more details.
13
#
14
# You should have received a copy of the GNU General Public License
15
# along with HEPData; if not, write to the
16
# Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston,
17
# MA 02111-1307, USA.
18
#
19
# In applying this license, CERN does not
20
# waive the privileges and immunities granted to it by virtue of its status
21
# as an Intergovernmental Organization or submit itself to any jurisdiction.
22
import logging
1✔
23

24
from functools import partial
1✔
25
from operator import is_not
1✔
26

27
from flask_login import current_user
1✔
28
from sqlalchemy import or_
1✔
29

30
from hepdata.config import OBSERVER_KEY_LENGTH
1✔
31
from hepdata.modules.permissions.models import SubmissionParticipant, CoordinatorRequest
1✔
32
from hepdata.modules.records.utils.common import get_record_contents
1✔
33
from hepdata.modules.submission.models import HEPSubmission, SubmissionObserver
1✔
34
from hepdata.utils.users import get_user_from_id, user_is_admin
1✔
35

36
logging.basicConfig()
1✔
37
log = logging.getLogger(__name__)
1✔
38

39

40
def get_records_participated_in_by_user(user):
1✔
41
    _current_user_id = user.id
1✔
42

43
    as_uploader = SubmissionParticipant.query.filter_by(user_account=_current_user_id, role='uploader').order_by(
1✔
44
        SubmissionParticipant.id.desc()).all()
45
    as_reviewer = SubmissionParticipant.query.filter_by(user_account=_current_user_id, role='reviewer').order_by(
1✔
46
        SubmissionParticipant.id.desc()).all()
47

48
    as_coordinator_query = HEPSubmission.query.filter_by(coordinator=_current_user_id).order_by(
1✔
49
        HEPSubmission.created.desc())
50

51
    # special case, since this user ID is the one used for loading all submissions, which is in the 1000s.
52
    if _current_user_id == 1:
1✔
53
        as_coordinator_query = as_coordinator_query.limit(5)
1✔
54

55
    as_coordinator = as_coordinator_query.all()
1✔
56

57
    result = {'uploader': [], 'reviewer': [], 'coordinator': []}
1✔
58
    if as_uploader:
1✔
59
        _uploader = [get_record_contents(x.publication_recid) for x in as_uploader]
1✔
60
        result['uploader'] = list(filter(partial(is_not, None), _uploader))
1✔
61

62
    if as_reviewer:
1✔
63
        _uploader = [get_record_contents(x.publication_recid) for x in as_reviewer]
×
64
        result['reviewer'] = list(filter(partial(is_not, None), _uploader))
×
65

66
    if as_coordinator:
1✔
67
        _coordinator = [get_record_contents(x.publication_recid) for x in as_coordinator]
1✔
68
        result['coordinator'] = list(filter(partial(is_not, None), _coordinator))
1✔
69

70
    return result
1✔
71

72

73
def get_pending_request(user=current_user):
1✔
74
    """
75
    Returns True if given user has an existing request.
76

77
    :param User user: user to check. Defaults to current user.
78

79
    :return:
80
    """
81
    _user_id = int(user.get_id())
1✔
82

83
    existing_request = CoordinatorRequest.query.filter_by(
1✔
84
        user=_user_id, in_queue=True).all()
85

86
    return existing_request
1✔
87

88

89
def process_coordinators(coordinators):
1✔
90
    values = []
1✔
91
    for coordinator in coordinators:
1✔
92
        user = get_user_from_id(coordinator.user)
×
93
        _coordinator_dict = {'message': coordinator.message, 'id': coordinator.id,
×
94
                             'approved': coordinator.approved,
95
                             'in_queue': coordinator.in_queue,
96
                             'collaboration': coordinator.collaboration,
97
                             'user': {'id': user.id, 'email': user.email}}
98
        values.append(_coordinator_dict)
×
99
    return values
1✔
100

101

102
def get_pending_coordinator_requests():
1✔
103
    """
104
    Returns pending coordinator requests.
105

106
    :return:
107
    """
108
    coordinators = CoordinatorRequest.query.filter_by(
1✔
109
        in_queue=True).all()
110

111
    result = process_coordinators(coordinators)
1✔
112

113
    return result
1✔
114

115

116
def get_approved_coordinators():
1✔
117
    """
118
    Returns approved coordinator requests.
119

120
    :return:
121
    """
122
    coordinators = CoordinatorRequest.query.filter_by(
×
123
        approved=True).order_by(CoordinatorRequest.collaboration).all()
124

125
    result = process_coordinators(coordinators)
×
126

127
    return result
×
128

129

130
def user_allowed_to_perform_action(recid):
1✔
131
    """Determines if a user is allowed to perform an action on a record."""
132
    if not current_user.is_authenticated:
1✔
133
        return False
1✔
134

135
    if user_is_admin(current_user):
1✔
136
        return True
1✔
137

138
    is_participant = SubmissionParticipant.query.filter_by(
×
139
        user_account=int(current_user.get_id()), publication_recid=recid, status='primary').count() > 0
140

141
    if is_participant:
×
142
        return True
×
143

144
    is_coordinator = HEPSubmission.query.filter_by(publication_recid=recid,
×
145
                                                   coordinator=int(current_user.get_id())).count() > 0
146

147
    return is_coordinator
×
148

149

150
def write_submissions_to_files():
1✔
151
    """Writes some statistics on number of submissions per Coordinator to files."""
152

153
    import csv
×
154
    from datetime import datetime
×
155

156
    # Open a CSV file to write the number of unfinished and finished submissions for each Coordinator.
157
    csvfile = open('submissions_per_coordinator_{}.csv'.format(datetime.utcnow().date()), 'w')
×
158
    writer = csv.writer(csvfile)
×
159
    writer.writerow(['user_id', 'user_email', 'collaboration', 'version',
×
160
                 'number_todo', 'number_finished'])
161

162
    # Open another CSV file to write the collaboration and date of each finished version 1 submission.
163
    csvfile1 = open('submissions_with_date_{}.csv'.format(datetime.utcnow().date()), 'w')
×
164
    writer1 = csv.writer(csvfile1)
×
165
    writer1.writerow(['collaboration', 'publication_recid', 'inspire_id',
×
166
                  'created', 'last_updated'])
167

168
    # Loop over approved Coordinators.
169
    coordinators = get_approved_coordinators()
×
170
    for coordinator in coordinators:
×
171
        user_id = coordinator['user']['id']
×
172
        user_email = coordinator['user']['email']
×
173
        collaboration = coordinator['collaboration']
×
174

175
        # For version 1 or version 2, write number of unfinished and finished submissions.
176
        for version in (1, 2):
×
177
            number_todo = HEPSubmission.query.filter(
×
178
                HEPSubmission.coordinator == user_id,
179
                or_(HEPSubmission.overall_status == 'todo',
180
                    HEPSubmission.overall_status == 'processing'),
181
                HEPSubmission.version == version).count()
182
            number_finished = HEPSubmission.query.filter_by(
×
183
                coordinator=user_id,
184
                overall_status='finished',
185
                version=version).count()
186
            writer.writerow([user_id, user_email, collaboration,
×
187
                             version, number_todo, number_finished])
188

189
        # For each finished version 1 submission, write collaboration and date.
190
        submissions = HEPSubmission.query.filter_by(
×
191
                coordinator=user_id,
192
                overall_status='finished',
193
                version=1).order_by(HEPSubmission.last_updated).all()
194
        for submission in submissions:
×
195
            writer1.writerow([collaboration, submission.publication_recid,
×
196
                              submission.inspire_id, submission.created,
197
                              submission.last_updated])
198

199
    csvfile.close()
×
200
    csvfile1.close()
×
201

202

203
def verify_observer_key(submission_id, observer_key):
1✔
204
    """
205
    Verifies the access key used to access a submission without
206
    login requirement.
207
    :param int submission_id:  The requested HEPSubmission for access
208
    :param str observer_key: The access key used to access the submission
209
    :returns: Bool representing match status against database
210
    """
211

212
    # Validate inputs
213
    if submission_id is None or observer_key is None:
1✔
214
        return False
1✔
215

216
    try:
1✔
217
        submission_id = int(submission_id)
1✔
NEW
218
    except (ValueError, TypeError):
×
NEW
219
        log.warning(f"Invalid submission_id format: {submission_id}")
×
NEW
220
        return False
×
221

222
    if len(observer_key) != OBSERVER_KEY_LENGTH:
1✔
223
        log.warning(f"Invalid observer_key length for submission {submission_id}")
1✔
224
        return False
1✔
225
    try:
1✔
226
        submission_observer = SubmissionObserver.query.filter_by(
1✔
227
            publication_recid=submission_id,
228
            observer_key=observer_key
229
        ).first()
230

231
        result = submission_observer is not None
1✔
232

233
        return result
1✔
234

NEW
235
    except Exception as e:
×
NEW
236
        log.error(f"Database error in verify_observer_key: {e}")
×
NEW
237
        return False
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc