• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

bloomberg / pybossa / 19482063855

18 Nov 2025 09:34PM UTC coverage: 93.533% (-0.5%) from 94.065%
19482063855

Pull #1075

github

dchhabda
modified boto2-3 migration
Pull Request #1075: RDISCROWD-8392: deprecate old boto. use boto3 only (Updated)

10 of 19 new or added lines in 3 files covered. (52.63%)

87 existing lines in 5 files now uncovered.

17703 of 18927 relevant lines covered (93.53%)

0.94 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.09
/pybossa/api/task_run.py
1
# -*- coding: utf8 -*-
2
# This file is part of PYBOSSA.
3
#
4
# Copyright (C) 2015 Scifabric LTD.
5
#
6
# PYBOSSA is free software: you can redistribute it and/or modify
7
# it under the terms of the GNU Affero General Public License as published by
8
# the Free Software Foundation, either version 3 of the License, or
9
# (at your option) any later version.
10
#
11
# PYBOSSA is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
# GNU Affero General Public License for more details.
15
#
16
# You should have received a copy of the GNU Affero General Public License
17
# along with PYBOSSA.  If not, see <http://www.gnu.org/licenses/>.
18
"""
1✔
19
PYBOSSA api module for exposing domain object TaskRun via an API.
20

21
This package adds GET, POST, PUT and DELETE methods for:
22
    * task_runs
23

24
"""
25
from copy import deepcopy
1✔
26
import json
1✔
27
import time
1✔
28

29
from datetime import datetime
1✔
30

31
from flask import request, Response, current_app
1✔
32
from flask import current_app as app
1✔
33
from flask_login import current_user
1✔
34
from werkzeug.exceptions import Forbidden, BadRequest
1✔
35

36
from .api_base import APIBase
1✔
37
from pybossa.model.task_run import TaskRun
1✔
38
from pybossa.util import get_user_id_or_ip
1✔
39
from pybossa.core import task_repo, sentinel, anonymizer, project_repo, user_repo, task_repo
1✔
40
from pybossa.core import performance_stats_repo
1✔
41
from pybossa.cloud_store_api.s3 import s3_upload_from_string
1✔
42
from pybossa.cloud_store_api.s3 import s3_upload_file_storage
1✔
43
from pybossa.contributions_guard import ContributionsGuard
1✔
44
from pybossa.auth import jwt_authorize_project
1✔
45
from pybossa.sched import can_post
1✔
46
from pybossa.model.completion_event import mark_if_complete
1✔
47
from pybossa.cache import delete_memoized
1✔
48
from pybossa.cache.helpers import n_available_tasks_for_user
1✔
49
from pybossa.cloud_store_api.s3 import upload_json_data
1✔
50
from pybossa.model.performance_stats import StatType, PerformanceStats
1✔
51
from pybossa.stats.gold import ConfusionMatrix, RightWrongCount
1✔
52
from pybossa.task_creator_helper import get_gold_answers
1✔
53
from pybossa.view.fileproxy import encrypt_task_response_data
1✔
54

55

56
class TaskRunAPI(APIBase):
1✔
57

58
    """Class API for domain object TaskRun."""
59

60
    DEFAULT_DATETIME = '1900-01-01T00:00:00.000000'
1✔
61
    DATETIME_FORMAT = '%Y-%m-%dT%H:%M:%S.%f'
1✔
62

63
    __class__ = TaskRun
1✔
64
    reserved_keys = set(['id', 'created', 'finish_time'])
1✔
65

66
    immutable_keys = set(['project_id', 'task_id'])
1✔
67

68
    def _preprocess_post_data(self, data):
1✔
69
        with_encryption = app.config.get('ENABLE_ENCRYPTION')
1✔
70
        upload_root_dir = app.config.get('S3_UPLOAD_DIRECTORY')
1✔
71
        conn_name = "S3_TASKRUN_V2" if app.config.get("S3_CONN_TYPE_V2") else "S3_TASKRUN"
1✔
72
        if current_user.is_anonymous:
1✔
73
            raise Forbidden('')
1✔
74
        task_id = data['task_id']
1✔
75
        project_id = data['project_id']
1✔
76
        self.check_can_post(project_id, task_id)
1✔
77
        preprocess_task_run(project_id, task_id, data)
1✔
78
        if with_encryption:
1✔
UNCOV
79
            info = data['info']
×
UNCOV
80
            path = "{0}/{1}/{2}".format(project_id, task_id, current_user.id)
×
81

82
            # for tasks with private_json_encrypted_payload, generate
83
            # encrypted response payload under private_json__encrypted_response
UNCOV
84
            encrypted_response = encrypt_task_response_data(task_id, project_id, info)
×
UNCOV
85
            data['info'] = {}
×
UNCOV
86
            pyb_answer_url = upload_json_data(json_data=info, upload_path=path,
×
87
                file_name='pyb_answer.json', encryption=with_encryption,
88
                conn_name=conn_name, upload_root_dir=upload_root_dir
89
            )
UNCOV
90
            if pyb_answer_url:
×
UNCOV
91
                data['info']['pyb_answer_url'] = pyb_answer_url
×
UNCOV
92
            if encrypted_response:
×
UNCOV
93
                data['info']['private_json__encrypted_response'] = encrypted_response
×
94

95

96
    def check_can_post(self, project_id, task_id):
1✔
97
        if not can_post(project_id, task_id, get_user_id_or_ip()):
1✔
98
            raise Forbidden("You must request a task first!")
1✔
99

100
    def _custom_filter(self, filters):
1✔
101
        if request.args.get('from_finish_time'):
1✔
102
            filters['from_finish_time'] = request.args['from_finish_time']
1✔
103
        if request.args.get('to_finish_time'):
1✔
104
            filters['to_finish_time'] = request.args['to_finish_time']
1✔
105
        return filters
1✔
106

107
    def _update_object(self, taskrun):
1✔
108
        """Update task_run object with user id or ip."""
109
        self.check_can_post(taskrun.project_id, taskrun.task_id)
1✔
110
        task = task_repo.get_task(taskrun.task_id)
1✔
111
        guard = ContributionsGuard(sentinel.master)
1✔
112

113
        self._validate_project_and_task(taskrun, task)
1✔
114
        self._ensure_task_was_requested(task, guard)
1✔
115
        self._add_user_info(taskrun)
1✔
116
        self._add_timestamps(taskrun, task, guard)
1✔
117

118
    def _update_attribute(self, new, old):
1✔
119
        for key, value in old.info.items():
1✔
120
            new.info.setdefault(key, value)
1✔
121

122
    def _forbidden_attributes(self, data):
1✔
123
        for key in data.keys():
1✔
124
            if key in self.reserved_keys:
1✔
125
                raise BadRequest("Reserved keys in payload")
1✔
126

127
    def _validate_project_and_task(self, taskrun, task):
1✔
128
        if task is None:  # pragma: no cover
129
            raise Forbidden('Invalid task_id')
130
        if (task.project_id != taskrun.project_id):
1✔
131
            raise Forbidden('Invalid project_id')
1✔
132
        if taskrun.external_uid:
1✔
133
            resp = jwt_authorize_project(task.project,
×
134
                                         request.headers.get('Authorization'))
135
            if type(resp) == Response:
×
136
                msg = json.loads(resp.data)['description']
×
137
                raise Forbidden(msg)
×
138

139
    def _ensure_task_was_requested(self, task, guard):
1✔
140
        if not guard.check_task_stamped(task, get_user_id_or_ip()):
1✔
141
            raise Forbidden('You must request a task first!')
1✔
142

143
    def _add_user_info(self, taskrun):
1✔
144
        if taskrun.external_uid is None:
1✔
145
            if current_user.is_anonymous:
1✔
146
                taskrun.user_ip = anonymizer.ip(request.remote_addr or
×
147
                                                '127.0.0.1')
148
            else:
149
                taskrun.user_id = current_user.id
1✔
150
        else:
151
            taskrun.user_ip = None
×
152
            taskrun.user_id = None
×
153

154
    def _add_created_timestamp(self, taskrun, task, guard):
1✔
155
        taskrun.created = guard.retrieve_timestamp(task, get_user_id_or_ip())
×
156
        guard._remove_task_stamped(task, get_user_id_or_ip())
×
157

158
    def _after_save(self, original_data, instance):
1✔
159
        if mark_if_complete(instance.task_id, instance.project_id):
1✔
160
            delete_memoized(n_available_tasks_for_user)
1✔
161
        task = task_repo.get_task(instance.task_id)
1✔
162
        gold_answers = get_gold_answers(task)
1✔
163
        update_gold_stats(instance.user_id, instance.task_id, original_data, gold_answers)
1✔
164
        update_quiz(instance.project_id, original_data['info'], gold_answers)
1✔
165

166
    def _add_timestamps(self, taskrun, task, guard):
1✔
167
        finish_time = datetime.utcnow().isoformat()
1✔
168

169
        # /cachePresentedTime API only caches when there is a user_id
170
        # otherwise it returns an arbitrary valid timestamp so that answer can be submitted
171
        if guard.retrieve_presented_timestamp(task, get_user_id_or_ip()):
1✔
172
            created = self._validate_datetime(guard.retrieve_presented_timestamp(task, get_user_id_or_ip()))
1✔
173
        else:
174
            created = datetime.strptime(self.DEFAULT_DATETIME, self.DATETIME_FORMAT).isoformat()
×
175

176
        # sanity check
177
        if created < finish_time:
1✔
178
            taskrun.created = created
1✔
179
            taskrun.finish_time = finish_time
1✔
180
        else:
181
            # return an arbitrary valid timestamp so that answer can be submitted
182
            created = datetime.strptime(self.DEFAULT_DATETIME, self.DATETIME_FORMAT)
×
183
            taskrun.created = created.isoformat()
×
184
            taskrun.finish_time = finish_time
×
185

186
    def _validate_datetime(self, timestamp):
1✔
187
        try:
1✔
188
            timestamp = datetime.strptime(timestamp, self.DATETIME_FORMAT)
1✔
189
        except:
1✔
190
            # return an arbitrary valid timestamp so that answer can be submitted
191
            timestamp = datetime.strptime(self.DEFAULT_DATETIME, self.DATETIME_FORMAT)
1✔
192
        return timestamp.isoformat()
1✔
193

194
    def _copy_original(self, item):
1✔
195
        return deepcopy(item)
1✔
196

197
    def _customize_response_dict(self, response_dict):
1✔
198
        task = task_repo.get_task(response_dict['task_id'])
1✔
199
        response_dict['gold_answers'] = get_gold_answers(task)
1✔
200

201

202
def _upload_files_from_json(task_run_info, upload_path, with_encryption):
1✔
203
    if not isinstance(task_run_info, dict):
1✔
204
        return
1✔
205
    for key, value in task_run_info.items():
1✔
206
        if key.endswith('__upload_url'):
1✔
UNCOV
207
            filename = value.get('filename')
×
UNCOV
208
            content = value.get('content')
×
UNCOV
209
            upload_root_dir = app.config.get('S3_UPLOAD_DIRECTORY')
×
UNCOV
210
            if filename is None or content is None:
×
211
                continue
×
UNCOV
212
            bucket = app.config.get("S3_BUCKET_V2") if app.config.get("S3_CONN_TYPE_V2") else app.config.get("S3_BUCKET")
×
UNCOV
213
            conn_name = "S3_TASKRUN_V2" if app.config.get("S3_CONN_TYPE_V2") else "S3_TASKRUN"
×
UNCOV
214
            out_url = s3_upload_from_string(bucket,
×
215
                                            content,
216
                                            filename,
217
                                            directory=upload_path, conn_name=conn_name,
218
                                            with_encryption=with_encryption,
219
                                            upload_root_dir=upload_root_dir)
UNCOV
220
            task_run_info[key] = out_url
×
221

222

223
def _upload_files_from_request(task_run_info, files, upload_path, with_encryption):
1✔
224
    for key in files:
1✔
UNCOV
225
        if not key.endswith('__upload_url'):
×
UNCOV
226
            raise BadRequest("File upload field should end in __upload_url")
×
UNCOV
227
        file_obj = request.files[key]
×
UNCOV
228
        bucket = app.config.get("S3_BUCKET_V2") if app.config.get("S3_CONN_TYPE_V2") else app.config.get("S3_BUCKET")
×
UNCOV
229
        conn_name = "S3_TASKRUN_V2" if app.config.get("S3_CONN_TYPE_V2") else "S3_TASKRUN"
×
UNCOV
230
        s3_url = s3_upload_file_storage(bucket,
×
231
                                        file_obj,
232
                                        directory=upload_path, conn_name=conn_name,
233
                                        with_encryption=with_encryption)
UNCOV
234
        task_run_info[key] = s3_url
×
235

236

237
def update_gold_stats(user_id, task_id, data, gold_answers=None):
1✔
238
    task = task_repo.get_task(task_id)
1✔
239
    if not task.calibration:
1✔
240
        return
1✔
241

242
    if gold_answers is None:
1✔
243
        gold_answers = get_gold_answers(task)
1✔
244
    answer_fields = project_repo.get(task.project_id).info.get('answer_fields', {})
1✔
245
    answer = data['info']
1✔
246
    _update_gold_stats(
1✔
247
        task.project_id,
248
        user_id,
249
        answer_fields,
250
        gold_answers,
251
        answer
252
    )
253

254
def update_quiz(project_id, answer, gold_answers):
1✔
255
    project = project_repo.get(project_id)
1✔
256
    user = user_repo.get(current_user.id)
1✔
257
    if not user.get_quiz_in_progress(project):
1✔
258
        return
1✔
259

260
    if gold_answers == answer:
1✔
261
        user.add_quiz_right_answer(project)
1✔
262
    else:
263
        user.add_quiz_wrong_answer(project)
1✔
264

265
    user_repo.update(user)
1✔
266

267
field_to_stat_type = {
1✔
268
    'categorical': StatType.confusion_matrix,
269
    'freetext': StatType.accuracy
270
}
271

272

273
type_to_class = {
1✔
274
    StatType.confusion_matrix: ConfusionMatrix,
275
    StatType.accuracy: RightWrongCount
276
}
277

278

279
def _update_gold_stats(project_id, user_id, gold_fields, gold_answer, answer):
1✔
280
    for path, specs in gold_fields.items():
1✔
281
        current_app.logger.info(path)
1✔
282
        current_app.logger.info(specs)
1✔
283
        stat_type = field_to_stat_type[specs['type']]
1✔
284
        stats = performance_stats_repo.filter_by(project_id=project_id,
1✔
285
            user_id=user_id, field=path, stat_type=stat_type)
286
        current_app.logger.info(stats)
1✔
287
        create = False
1✔
288
        if not stats:
1✔
289
            stat_row = PerformanceStats(
1✔
290
                project_id=project_id,
291
                user_id=user_id,
292
                field=path,
293
                stat_type=stat_type,
294
                info={})
295
            create = True
1✔
296
        else:
297
            stat_row = stats[0]
1✔
298
        current_app.logger.info(stat_row)
1✔
299

300
        stat_class = type_to_class[stat_type]
1✔
301
        specs['config'].update(stat_row.info)
1✔
302
        stat = stat_class(**specs['config'])
1✔
303
        stat.compute(answer, gold_answer, path)
1✔
304
        stat_row.info = stat.value
1✔
305
        current_app.logger.info('save stats')
1✔
306
        current_app.logger.info(stat_row)
1✔
307
        if create:
1✔
308
            performance_stats_repo.save(stat_row)
1✔
309
        else:
310
            performance_stats_repo.update(stat_row)
1✔
311

312
def preprocess_task_run(project_id, task_id, data):
1✔
313
        with_encryption = app.config.get('ENABLE_ENCRYPTION')
1✔
314
        info = data.get('info')
1✔
315
        if info is None:
1✔
316
            return
1✔
317
        path = "{0}/{1}/{2}".format(project_id, task_id, current_user.id)
1✔
318
        _upload_files_from_json(info, path, with_encryption)
1✔
319
        _upload_files_from_request(info, request.files, path, with_encryption)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc