• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

bloomberg / pybossa / 18283471643

06 Oct 2025 02:02PM UTC coverage: 93.875% (-0.2%) from 94.062%
18283471643

Pull #1069

github

peterkle
add test
Pull Request #1069: RDISCROWD-8392 upgrade to boto3

187 of 232 new or added lines in 7 files covered. (80.6%)

5 existing lines in 2 files now uncovered.

17887 of 19054 relevant lines covered (93.88%)

0.94 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.05
/pybossa/task_creator_helper.py
1
# -*- coding: utf8 -*-
2
# This file is part of PYBOSSA.
3
#
4
# Copyright (C) 2019 Scifabric LTD.
5
#
6
# PYBOSSA is free software: you can redistribute it and/or modify
7
# it under the terms of the GNU Affero General Public License as published by
8
# the Free Software Foundation, either version 3 of the License, or
9
# (at your option) any later version.
10
#
11
# PYBOSSA is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
# GNU Affero General Public License for more details.
15
#
16
# You should have received a copy of the GNU Affero General Public License
17
# along with PYBOSSA.  If not, see <http://www.gnu.org/licenses/>.
18
"""Module with PyBossa create task helper."""
1✔
19
import os
1✔
20
from flask import current_app
1✔
21
import hashlib
1✔
22
import copy
1✔
23
from flask import url_for
1✔
24
import json
1✔
25
import requests
1✔
26
from six import string_types
1✔
27
from botocore.exceptions import ClientError
1✔
28
from werkzeug.exceptions import InternalServerError, NotFound
1✔
29
from pybossa.util import get_time_plus_delta_ts
1✔
30
from pybossa.cloud_store_api.s3 import upload_json_data, get_content_from_s3
1✔
31
from pybossa.cloud_store_api.s3 import get_content_and_key_from_s3
1✔
32
from pybossa.encryption import AESWithGCM
1✔
33

34

35
TASK_PRIVATE_GOLD_ANSWER_FILE_NAME = 'task_private_gold_answer.json'
1✔
36
TASK_GOLD_ANSWER_URL_KEY = 'gold_ans__upload_url'
1✔
37

38

39
def encrypted():
1✔
40
    return current_app.config.get('ENABLE_ENCRYPTION')
1✔
41

42

43
def bucket_name():
1✔
44
    return current_app.config.get("S3_REQUEST_BUCKET_V2") or current_app.config.get("S3_REQUEST_BUCKET")
1✔
45

46

47
def s3_conn_type():
1✔
48
    return current_app.config.get('S3_CONN_TYPE_V2') or current_app.config.get('S3_CONN_TYPE')
1✔
49

50

51
def get_task_expiration(expiration, create_time):
1✔
52
    """
53
    Given current task expiration, compute new expiration based on
54
    1. task creation date and 2. max allowed task expiration
55
    do that task expiration cannot be set beyond max task expiration
56
    from task creation date
57
    """
58
    max_expiration_days = current_app.config.get('TASK_EXPIRATION', 60)
1✔
59
    max_expiration = get_time_plus_delta_ts(
1✔
60
        create_time, days=max_expiration_days)
61

62
    if expiration and isinstance(expiration, string_types):
1✔
63
        max_expiration = max_expiration.isoformat()
1✔
64

65
    expiration = expiration or max_expiration
1✔
66
    return min(expiration, max_expiration)
1✔
67

68

69
def set_gold_answers(task, gold_answers):
1✔
70
    if not gold_answers:
1✔
71
        return
1✔
72
    if encrypted():
1✔
73
        url = upload_files_priv(task, task.project_id, gold_answers,
1✔
74
                                TASK_PRIVATE_GOLD_ANSWER_FILE_NAME)['externalUrl']
75
        gold_answers = dict([(TASK_GOLD_ANSWER_URL_KEY, url)])
1✔
76

77
    task.gold_answers = gold_answers
1✔
78
    task.calibration = 1
1✔
79
    task.exported = True
1✔
80
    if task.state == 'completed':
1✔
81
        task.state = 'ongoing'
×
82

83

84
def upload_files_priv(task, project_id, data, file_name):
1✔
85
    bucket = bucket_name()
1✔
86

87
    # hashlib.md5() accepts bytes only
88
    task_hash = hashlib.md5(str(task).encode()).hexdigest()
1✔
89

90
    path = "{}/{}".format(project_id, task_hash)
1✔
91
    store = s3_conn_type()
1✔
92
    values = dict(
1✔
93
        store=store,
94
        bucket=bucket,
95
        project_id=project_id,
96
        path='{}/{}'.format(task_hash, file_name)
97
    )
98
    file_url = url_for('fileproxy.encrypted_file', **values)
1✔
99
    conn_name = "S3_TASK_REQUEST_V2" if store == current_app.config.get(
1✔
100
        "S3_CONN_TYPE_V2") else "S3_TASK_REQUEST"
101
    internal_url = upload_json_data(
1✔
102
        bucket=bucket,
103
        json_data=data,
104
        upload_path=path,
105
        file_name=file_name,
106
        encryption=True,
107
        conn_name=conn_name
108
    )
109
    return {'externalUrl': file_url, 'internalUrl': internal_url}
1✔
110

111

112
def get_gold_answers(task):
1✔
113
    gold_answers = task.gold_answers
1✔
114

115
    if not encrypted() or gold_answers is None:
1✔
116
        return gold_answers
1✔
117

118
    url = gold_answers.get(TASK_GOLD_ANSWER_URL_KEY)
1✔
119
    if not url:
1✔
120
        raise Exception(
1✔
121
            'Cannot retrieve Private Gigwork gold answers for task id {}. URL is missing.'.format(task.id))
122

123
    # The task instance here is not the same as the one that was used to generate the hash
124
    # in the upload url. So we can't regenerate that hash here, and instead we have to parse it
125
    # from the url.
126

127
    parts = url.split('/')
1✔
128
    store = parts[3] if len(
1✔
129
        parts) > 3 and parts[1] == "fileproxy" and parts[2] == "encrypted" else None
130
    conn_name = "S3_TASK_REQUEST_V2" if store == current_app.config.get(
1✔
131
        "S3_CONN_TYPE_V2") else "S3_TASK_REQUEST"
132
    key_name = '/{}/{}/{}'.format(*parts[-3:])
1✔
133
    current_app.logger.info(
1✔
134
        "gold_answers url %s, store %s, conn_name %s, key %s", url, store, conn_name, key_name)
135
    decrypted = get_content_from_s3(
1✔
136
        s3_bucket=parts[-4], path=key_name, conn_name=conn_name, decrypt=True)
137
    return json.loads(decrypted)
1✔
138

139

140
def get_path(dict_, path):
1✔
141
    if not path:
1✔
142
        return dict_
1✔
143
    return get_path(dict_[path[0]], path[1:])
1✔
144

145

146
def get_secret_from_env(project_encryption):
1✔
147
    config = current_app.config['SECRET_CONFIG_ENV']
1✔
148
    if not isinstance(config, dict) or "secret_id_prefix" not in config:
1✔
149
        raise RuntimeError("Env secret configuration is not valid")
1✔
150

151
    secret_id = config.get("secret_id_prefix")
1✔
152
    proj_secret_id = project_encryption.get(secret_id)
1✔
153
    env_secret_id = f"{secret_id}_{proj_secret_id}"
1✔
154
    current_app.logger.info(
1✔
155
        "get_secret_from_env env_secret_id %s", env_secret_id)
156
    try:
1✔
157
        return os.environ[env_secret_id]
1✔
158
    except Exception:
1✔
159
        raise RuntimeError(
1✔
160
            f"Unable to fetch project encryption key from {env_secret_id}")
161

162

163
def get_project_encryption(project):
1✔
164
    encryption_jpath = current_app.config.get('ENCRYPTION_CONFIG_PATH')
1✔
165
    if not encryption_jpath:
1✔
UNCOV
166
        return None
×
167
    data = project['info']
1✔
168
    for segment in encryption_jpath:
1✔
169
        data = data.get(segment, {})
1✔
170
    return data
1✔
171

172

173
def get_encryption_key(project):
1✔
174
    project_encryption = get_project_encryption(project)
1✔
175
    if not project_encryption:
1✔
176
        return
1✔
177

178
    secret_from_env = current_app.config.get("SECRET_CONFIG_ENV", False)
1✔
179
    if not secret_from_env:
1✔
180
        current_app.logger.exception(
1✔
181
            'Missing env config SECRET_CONFIG_ENV. Cannot process encryption for Project id %d', project.id)
182
        raise InternalServerError(
1✔
183
            f"Unable to fetch encryption key for project id {project.id}")
184
    return get_secret_from_env(project_encryption)
1✔
185

186

187
def read_encrypted_file(store, project, bucket, key_name):
1✔
188
    conn_name = "S3_TASK_REQUEST_V2" if store == current_app.config.get(
1✔
189
        "S3_CONN_TYPE_V2") else "S3_TASK_REQUEST"
190
    # download file
191
    if bucket not in [current_app.config.get("S3_REQUEST_BUCKET"), current_app.config.get("S3_REQUEST_BUCKET_V2")]:
1✔
192
        secret = get_encryption_key(project)
1✔
193
    else:
194
        secret = current_app.config.get('FILE_ENCRYPTION_KEY')
1✔
195

196
    try:
1✔
197
        decrypted, key = get_content_and_key_from_s3(
1✔
198
            bucket, key_name, conn_name, decrypt=secret, secret=secret)
NEW
199
    except ClientError as e:
×
NEW
200
        current_app.logger.exception(
×
201
            'Project id {} get task file {} {}'.format(project.id, key_name, e))
NEW
202
        error_code = e.response.get('Error', {}).get('Code', '')
×
NEW
203
        if error_code == 'NoSuchKey':
×
UNCOV
204
            raise NotFound('File Does Not Exist')
×
205
        else:
UNCOV
206
            raise InternalServerError('An Error Occurred')
×
207
    return decrypted, key
1✔
208

209

210
def generate_checksum(project_id, task):
1✔
211
    from pybossa.cache.projects import get_project_data
1✔
212
    from pybossa.core import private_required_fields
1✔
213

214
    if not (task and isinstance(task, dict) and "info" in task):
1✔
215
        return
1✔
216

217
    project = get_project_data(project_id)
1✔
218
    if not project:
1✔
219
        current_app.logger.info(
1✔
220
            "Duplicate task checksum not generated. Incorrect project id %s", str(project_id))
221
        return
1✔
222

223
    # with task payload not proper dict, dup checksum cannot be computed and will be set to null
224
    if not isinstance(task["info"], dict):
1✔
225
        current_app.logger.info("Duplicate task checksum not generated for project %s. Task.info type is %s, expected dict",
1✔
226
                                str(project_id), str(type(task["info"])))
227
        return
1✔
228

229
    # drop reserved columns that are always going to have unique values in
230
    # certain scenarios as this could miss duplicate task check correctly on
231
    # remaining fields when all fields are included for duplicate check
232
    task_reserved_cols = current_app.config.get("TASK_RESERVED_COLS", [])
1✔
233
    task_info = {k: v for k, v in task["info"].items(
1✔
234
    ) if k not in task_reserved_cols}
235

236
    # include all task_info fields with no field configured under duplicate_fields
237
    dup_task_config = project.info.get("duplicate_task_check", {})
1✔
238
    dup_fields_configured = dup_task_config.get("duplicate_fields", [])
1✔
239

240
    task_contents = {}
1✔
241
    if current_app.config.get("PRIVATE_INSTANCE") and dup_task_config:
1✔
242
        # csv import under private instance, may contain private data under _priv cols
243
        # prior to this call, sucn _priv columns are combined together into task.private_fields
244
        # collect fieldname and value from private_fields that are not part of task.info
245
        private_fields = task.get('private_fields', None)
1✔
246
        if private_fields:
1✔
247
            for field, value in private_fields.items():
1✔
248
                task_contents[field] = value
1✔
249

250
        for field, value in task_info.items():
1✔
251
            # private required fields are excluded from building duplicate checksum
252
            if field in private_required_fields:
1✔
253
                continue
1✔
254

255
            if field.endswith("__upload_url"):
1✔
256
                current_app.logger.info(
1✔
257
                    "generate_checksum file payload name %s, path %s", field, value)
258
                tokens = value.split("/")
1✔
259
                count_slash = value.count("/")
1✔
260
                if count_slash >= 6 and tokens[1] == "fileproxy" and tokens[2] == "encrypted":
1✔
261
                    store = tokens[3]
1✔
262
                    bucket = tokens[4]
1✔
263
                    project_id_from_url = int(tokens[5])
1✔
264
                    current_app.logger.info(
1✔
265
                        "generate_checksum file tokens %s", str(tokens))
266
                    if int(project_id) != project_id_from_url:
1✔
267
                        current_app.logger.info("error computing duplicate checksum. incorrect project id in url path. project id expected %s vs actual %s, url %s",
1✔
268
                                                str(project_id), str(project_id_from_url), str(value))
269
                        continue
1✔
270

271
                    path = "/".join((tokens[5:]))
1✔
272
                    try:
1✔
273
                        current_app.logger.info(
1✔
274
                            "generate_checksum parsed file info. store %s, bucket %s, path %s", store, bucket, path)
275
                        content, _ = read_encrypted_file(
1✔
276
                            store, project, bucket, path)
277
                        content = json.loads(content)
1✔
278
                        task_contents.update(content)
1✔
279
                    except Exception as e:
1✔
280
                        current_app.logger.info("error generating duplicate checksum with url contents for project %s, %s, %s %s",
1✔
281
                                                str(project_id), field, str(value), str(e))
282
                        raise Exception(
1✔
283
                            f"Error generating duplicate checksum with url contents. url {field}, {value}")
284
                else:
285
                    current_app.logger.info(
1✔
286
                        "error parsing task data url to compute duplicate checksum %s, %s", field, str(value))
287
            elif field == "private_json__encrypted_payload":
1✔
288
                try:
1✔
289
                    secret = get_encryption_key(project)
1✔
290
                    cipher = AESWithGCM(secret) if secret else None
1✔
291
                    encrypted_content = task_info.get(
1✔
292
                        "private_json__encrypted_payload")
293
                    content = cipher.decrypt(
1✔
294
                        encrypted_content) if cipher else encrypted_content
295
                    content = json.loads(content)
1✔
296
                    task_contents.update(content)
1✔
297
                except Exception as e:
×
298
                    current_app.logger.info("error generating duplicate checksum with encrypted payload for project %s, %s, %s %s",
×
299
                                            str(project_id), field, str(value), str(e))
NEW
300
                    raise Exception(
×
301
                        f"Error generating duplicate checksum with encrypted payload. {field}, {value}")
302
            else:
303
                task_contents[field] = value
1✔
304
    else:
305
        # with duplicate check not configured, consider all task fields
306
        task_contents = task_info
1✔
307

308
    checksum_fields = task_contents.keys(
1✔
309
    ) if not dup_fields_configured else dup_fields_configured
310
    try:
1✔
311
        checksum_payload = {
1✔
312
            field: task_contents[field] for field in checksum_fields}
313
        checksum = hashlib.sha256()
1✔
314
        checksum.update(json.dumps(checksum_payload,
1✔
315
                        sort_keys=True).encode("utf-8"))
316
        checksum_value = checksum.hexdigest()
1✔
317
        return checksum_value
1✔
318
    except KeyError as e:
1✔
319
        private_fields = task.get('private_fields', None)
1✔
320
        task_payload = copy.deepcopy(task_info)
1✔
321
        task_payload["private_fields_keys"] = list(
1✔
322
            private_fields.keys()) if private_fields else []
323
        current_app.logger.info("error generating duplicate checksum for project id %s, error %s, task payload %s", str(
1✔
324
            project_id), str(e), json.dumps(task_payload))
325
        raise Exception(
1✔
326
            f"Error generating duplicate checksum due to missing checksum configured fields {checksum_fields}")
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc