• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

bloomberg / pybossa / 25116416987

29 Apr 2026 02:54PM UTC coverage: 94.064% (-0.04%) from 94.101%
25116416987

Pull #1088

github

n00rsy
cr updates
Pull Request #1088: ANNO-342 add db checks to task run to prevent race condition

8 of 17 new or added lines in 2 files covered. (47.06%)

1 existing line in 1 file now uncovered.

18002 of 19138 relevant lines covered (94.06%)

0.94 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.42
/pybossa/api/task_run.py
1
# -*- coding: utf8 -*-
2
# This file is part of PYBOSSA.
3
#
4
# Copyright (C) 2015 Scifabric LTD.
5
#
6
# PYBOSSA is free software: you can redistribute it and/or modify
7
# it under the terms of the GNU Affero General Public License as published by
8
# the Free Software Foundation, either version 3 of the License, or
9
# (at your option) any later version.
10
#
11
# PYBOSSA is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
# GNU Affero General Public License for more details.
15
#
16
# You should have received a copy of the GNU Affero General Public License
17
# along with PYBOSSA.  If not, see <http://www.gnu.org/licenses/>.
18
"""
1✔
19
PYBOSSA api module for exposing domain object TaskRun via an API.
20

21
This package adds GET, POST, PUT and DELETE methods for:
22
    * task_runs
23

24
"""
25
from copy import deepcopy
1✔
26
import json
1✔
27
import time
1✔
28

29
from datetime import datetime
1✔
30

31
from flask import request, Response, current_app
1✔
32
from flask import current_app as app
1✔
33
from flask_login import current_user
1✔
34
from werkzeug.exceptions import Forbidden, BadRequest
1✔
35

36
from .api_base import APIBase
1✔
37
from pybossa.model.task_run import TaskRun
1✔
38
from pybossa.util import get_user_id_or_ip
1✔
39
from pybossa.core import task_repo, sentinel, anonymizer, project_repo, user_repo, task_repo
1✔
40
from pybossa.core import performance_stats_repo
1✔
41
from pybossa.cloud_store_api.s3 import s3_upload_from_string
1✔
42
from pybossa.cloud_store_api.s3 import s3_upload_file_storage
1✔
43
from pybossa.contributions_guard import ContributionsGuard
1✔
44
from pybossa.auth import jwt_authorize_project
1✔
45
from pybossa.sched import can_post
1✔
46
from pybossa.core import db
1✔
47
from sqlalchemy.sql import func
1✔
48
from pybossa.model.completion_event import mark_if_complete
1✔
49
from pybossa.cache import delete_memoized
1✔
50
from pybossa.cache.helpers import n_available_tasks_for_user
1✔
51
from pybossa.cloud_store_api.s3 import upload_json_data
1✔
52
from pybossa.model.performance_stats import StatType, PerformanceStats
1✔
53
from pybossa.stats.gold import ConfusionMatrix, RightWrongCount
1✔
54
from pybossa.task_creator_helper import get_gold_answers
1✔
55
from pybossa.view.fileproxy import encrypt_task_response_data
1✔
56

57

58
class TaskRunAPI(APIBase):
1✔
59

60
    """Class API for domain object TaskRun."""
61

62
    DEFAULT_DATETIME = '1900-01-01T00:00:00.000000'
1✔
63
    DATETIME_FORMAT = '%Y-%m-%dT%H:%M:%S.%f'
1✔
64

65
    __class__ = TaskRun
1✔
66
    reserved_keys = set(['id', 'created', 'finish_time'])
1✔
67

68
    immutable_keys = set(['project_id', 'task_id'])
1✔
69

70
    def _preprocess_post_data(self, data):
1✔
71
        with_encryption = app.config.get('ENABLE_ENCRYPTION')
1✔
72
        upload_root_dir = app.config.get('S3_UPLOAD_DIRECTORY')
1✔
73
        conn_name = "S3_TASKRUN_V2" if app.config.get("S3_CONN_TYPE_V2") else "S3_TASKRUN"
1✔
74
        if current_user.is_anonymous:
1✔
75
            raise Forbidden('')
1✔
76
        task_id = data['task_id']
1✔
77
        project_id = data['project_id']
1✔
78
        self.check_can_post(project_id, task_id)
1✔
79
        preprocess_task_run(project_id, task_id, data)
1✔
80
        if with_encryption:
1✔
81
            info = data['info']
1✔
82
            path = "{0}/{1}/{2}".format(project_id, task_id, current_user.id)
1✔
83

84
            # for tasks with private_json_encrypted_payload, generate
85
            # encrypted response payload under private_json__encrypted_response
86
            encrypted_response = encrypt_task_response_data(task_id, project_id, info)
1✔
87
            data['info'] = {}
1✔
88
            pyb_answer_url = upload_json_data(json_data=info, upload_path=path,
1✔
89
                file_name='pyb_answer.json', encryption=with_encryption,
90
                conn_name=conn_name, upload_root_dir=upload_root_dir
91
            )
92
            if pyb_answer_url:
1✔
93
                data['info']['pyb_answer_url'] = pyb_answer_url
1✔
94
            if encrypted_response:
1✔
95
                data['info']['private_json__encrypted_response'] = encrypted_response
1✔
96

97

98
    def check_can_post(self, project_id, task_id):
1✔
99
        if not can_post(project_id, task_id, get_user_id_or_ip()):
1✔
100
            raise Forbidden("You must request a task first!")
1✔
101

102
    def _custom_filter(self, filters):
1✔
103
        if request.args.get('from_finish_time'):
1✔
104
            filters['from_finish_time'] = request.args['from_finish_time']
1✔
105
        if request.args.get('to_finish_time'):
1✔
106
            filters['to_finish_time'] = request.args['to_finish_time']
1✔
107
        return filters
1✔
108

109
    def _update_object(self, taskrun):
1✔
110
        """Update task_run object with user id or ip."""
111
        self.check_can_post(taskrun.project_id, taskrun.task_id)
1✔
112
        task = task_repo.get_task(taskrun.task_id)
1✔
113
        guard = ContributionsGuard(sentinel.master)
1✔
114

115
        self._validate_project_and_task(taskrun, task)
1✔
116
        self._check_task_not_over_answered(task)
1✔
117
        self._ensure_task_was_requested(task, guard)
1✔
118
        self._add_user_info(taskrun)
1✔
119
        self._add_timestamps(taskrun, task, guard)
1✔
120

121
    def _update_attribute(self, new, old):
1✔
122
        for key, value in old.info.items():
1✔
123
            new.info.setdefault(key, value)
1✔
124

125
    def _forbidden_attributes(self, data):
1✔
126
        for key in data.keys():
1✔
127
            if key in self.reserved_keys:
1✔
128
                raise BadRequest("Reserved keys in payload")
1✔
129

130
    def _validate_project_and_task(self, taskrun, task):
1✔
131
        if task is None:  # pragma: no cover
132
            raise Forbidden('Invalid task_id')
133
        if (task.project_id != taskrun.project_id):
1✔
134
            raise Forbidden('Invalid project_id')
1✔
135
        if taskrun.external_uid:
1✔
136
            resp = jwt_authorize_project(task.project,
×
137
                                         request.headers.get('Authorization'))
138
            if type(resp) == Response:
×
139
                msg = json.loads(resp.data)['description']
×
140
                raise Forbidden(msg)
×
141

142
    def _check_task_not_over_answered(self, task):
1✔
143
        """Reject submission if task already has n_answers task_runs.
144

145
        This is a defense-in-depth check against race conditions caused by
146
        slave DB replication lag during task assignment.
147
        """
148
        if task.calibration:
1✔
149
            return
1✔
150
        actual_count = db.session.query(
1✔
151
            func.count(TaskRun.id)
152
        ).filter_by(task_id=task.id).scalar()
153
        if actual_count >= task.n_answers:
1✔
NEW
154
            current_app.logger.warning(
×
155
                "Rejected task_run submission for task %s: "
156
                "master DB shows %d task_runs >= n_answers %d",
157
                task.id, actual_count, task.n_answers
158
            )
NEW
159
            raise Forbidden(
×
160
                "This task has already received enough submissions."
161
            )
162

163
    def _ensure_task_was_requested(self, task, guard):
1✔
164
        if not guard.check_task_stamped(task, get_user_id_or_ip()):
1✔
165
            raise Forbidden('You must request a task first!')
1✔
166

167
    def _add_user_info(self, taskrun):
1✔
168
        if taskrun.external_uid is None:
1✔
169
            if current_user.is_anonymous:
1✔
170
                taskrun.user_ip = anonymizer.ip(request.remote_addr or
×
171
                                                '127.0.0.1')
172
            else:
173
                taskrun.user_id = current_user.id
1✔
174
        else:
175
            taskrun.user_ip = None
×
176
            taskrun.user_id = None
×
177

178
    def _add_created_timestamp(self, taskrun, task, guard):
1✔
179
        taskrun.created = guard.retrieve_timestamp(task, get_user_id_or_ip())
×
180
        guard._remove_task_stamped(task, get_user_id_or_ip())
×
181

182
    def _after_save(self, original_data, instance):
1✔
183
        if mark_if_complete(instance.task_id, instance.project_id):
1✔
184
            delete_memoized(n_available_tasks_for_user)
1✔
185
        task = task_repo.get_task(instance.task_id)
1✔
186
        gold_answers = get_gold_answers(task)
1✔
187
        update_gold_stats(instance.user_id, instance.task_id, original_data, gold_answers)
1✔
188
        update_quiz(instance.project_id, original_data['info'], gold_answers)
1✔
189

190
    def _add_timestamps(self, taskrun, task, guard):
1✔
191
        finish_time = datetime.utcnow().isoformat()
1✔
192

193
        # /cachePresentedTime API only caches when there is a user_id
194
        # otherwise it returns an arbitrary valid timestamp so that answer can be submitted
195
        if guard.retrieve_presented_timestamp(task, get_user_id_or_ip()):
1✔
196
            created = self._validate_datetime(guard.retrieve_presented_timestamp(task, get_user_id_or_ip()))
1✔
197
        else:
198
            created = datetime.strptime(self.DEFAULT_DATETIME, self.DATETIME_FORMAT).isoformat()
×
199

200
        # sanity check
201
        if created < finish_time:
1✔
202
            taskrun.created = created
1✔
203
            taskrun.finish_time = finish_time
1✔
204
        else:
205
            # return an arbitrary valid timestamp so that answer can be submitted
206
            created = datetime.strptime(self.DEFAULT_DATETIME, self.DATETIME_FORMAT)
×
207
            taskrun.created = created.isoformat()
×
208
            taskrun.finish_time = finish_time
×
209

210
    def _validate_datetime(self, timestamp):
1✔
211
        try:
1✔
212
            timestamp = datetime.strptime(timestamp, self.DATETIME_FORMAT)
1✔
213
        except:
1✔
214
            # return an arbitrary valid timestamp so that answer can be submitted
215
            timestamp = datetime.strptime(self.DEFAULT_DATETIME, self.DATETIME_FORMAT)
1✔
216
        return timestamp.isoformat()
1✔
217

218
    def _copy_original(self, item):
1✔
219
        return deepcopy(item)
1✔
220

221
    def _customize_response_dict(self, response_dict):
1✔
222
        task = task_repo.get_task(response_dict['task_id'])
1✔
223
        response_dict['gold_answers'] = get_gold_answers(task)
1✔
224

225

226
def _upload_files_from_json(task_run_info, upload_path, with_encryption):
1✔
227
    if not isinstance(task_run_info, dict):
1✔
228
        return
1✔
229
    for key, value in task_run_info.items():
1✔
230
        if key.endswith('__upload_url'):
1✔
231
            filename = value.get('filename')
1✔
232
            content = value.get('content')
1✔
233
            upload_root_dir = app.config.get('S3_UPLOAD_DIRECTORY')
1✔
234
            if filename is None or content is None:
1✔
235
                continue
×
236
            bucket = app.config.get("S3_BUCKET_V2") if app.config.get("S3_CONN_TYPE_V2") else app.config.get("S3_BUCKET")
1✔
237
            conn_name = "S3_TASKRUN_V2" if app.config.get("S3_CONN_TYPE_V2") else "S3_TASKRUN"
1✔
238
            out_url = s3_upload_from_string(bucket,
1✔
239
                                            content,
240
                                            filename,
241
                                            directory=upload_path, conn_name=conn_name,
242
                                            with_encryption=with_encryption,
243
                                            upload_root_dir=upload_root_dir)
244
            task_run_info[key] = out_url
1✔
245

246

247
def _upload_files_from_request(task_run_info, files, upload_path, with_encryption):
1✔
248
    for key in files:
1✔
249
        if not key.endswith('__upload_url'):
1✔
250
            raise BadRequest("File upload field should end in __upload_url")
1✔
251
        file_obj = request.files[key]
1✔
252
        bucket = app.config.get("S3_BUCKET_V2") if app.config.get("S3_CONN_TYPE_V2") else app.config.get("S3_BUCKET")
1✔
253
        conn_name = "S3_TASKRUN_V2" if app.config.get("S3_CONN_TYPE_V2") else "S3_TASKRUN"
1✔
254
        s3_url = s3_upload_file_storage(bucket,
1✔
255
                                        file_obj,
256
                                        directory=upload_path, conn_name=conn_name,
257
                                        with_encryption=with_encryption)
258
        task_run_info[key] = s3_url
1✔
259

260

261
def update_gold_stats(user_id, task_id, data, gold_answers=None):
1✔
262
    task = task_repo.get_task(task_id)
1✔
263
    if not task.calibration:
1✔
264
        return
1✔
265

266
    if gold_answers is None:
1✔
267
        gold_answers = get_gold_answers(task)
1✔
268
    answer_fields = project_repo.get(task.project_id).info.get('answer_fields', {})
1✔
269
    answer = data['info']
1✔
270
    _update_gold_stats(
1✔
271
        task.project_id,
272
        user_id,
273
        answer_fields,
274
        gold_answers,
275
        answer
276
    )
277

278
def update_quiz(project_id, answer, gold_answers):
1✔
279
    project = project_repo.get(project_id)
1✔
280
    user = user_repo.get(current_user.id)
1✔
281
    if not user.get_quiz_in_progress(project):
1✔
282
        return
1✔
283

284
    if gold_answers == answer:
1✔
285
        user.add_quiz_right_answer(project)
1✔
286
    else:
287
        user.add_quiz_wrong_answer(project)
1✔
288

289
    user_repo.update(user)
1✔
290

291
field_to_stat_type = {
1✔
292
    'categorical': StatType.confusion_matrix,
293
    'freetext': StatType.accuracy
294
}
295

296

297
type_to_class = {
1✔
298
    StatType.confusion_matrix: ConfusionMatrix,
299
    StatType.accuracy: RightWrongCount
300
}
301

302

303
def _update_gold_stats(project_id, user_id, gold_fields, gold_answer, answer):
1✔
304
    for path, specs in gold_fields.items():
1✔
305
        current_app.logger.info(path)
1✔
306
        current_app.logger.info(specs)
1✔
307
        stat_type = field_to_stat_type[specs['type']]
1✔
308
        stats = performance_stats_repo.filter_by(project_id=project_id,
1✔
309
            user_id=user_id, field=path, stat_type=stat_type)
310
        current_app.logger.info(stats)
1✔
311
        create = False
1✔
312
        if not stats:
1✔
313
            stat_row = PerformanceStats(
1✔
314
                project_id=project_id,
315
                user_id=user_id,
316
                field=path,
317
                stat_type=stat_type,
318
                info={})
319
            create = True
1✔
320
        else:
321
            stat_row = stats[0]
1✔
322
        current_app.logger.info(stat_row)
1✔
323

324
        stat_class = type_to_class[stat_type]
1✔
325
        specs['config'].update(stat_row.info)
1✔
326
        stat = stat_class(**specs['config'])
1✔
327
        stat.compute(answer, gold_answer, path)
1✔
328
        stat_row.info = stat.value
1✔
329
        current_app.logger.info('save stats')
1✔
330
        current_app.logger.info(stat_row)
1✔
331
        if create:
1✔
332
            performance_stats_repo.save(stat_row)
1✔
333
        else:
334
            performance_stats_repo.update(stat_row)
1✔
335

336
def preprocess_task_run(project_id, task_id, data):
1✔
337
        with_encryption = app.config.get('ENABLE_ENCRYPTION')
1✔
338
        info = data.get('info')
1✔
339
        if info is None:
1✔
340
            return
1✔
341
        path = "{0}/{1}/{2}".format(project_id, task_id, current_user.id)
1✔
342
        _upload_files_from_json(info, path, with_encryption)
1✔
343
        _upload_files_from_request(info, request.files, path, with_encryption)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc