• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

bloomberg / pybossa / 19482063855

18 Nov 2025 09:34PM UTC coverage: 93.533% (-0.5%) from 94.065%
19482063855

Pull #1075

github

dchhabda
modified boto2-3 migration
Pull Request #1075: RDISCROWD-8392: deprecate old boto. use boto3 only (Updated)

10 of 19 new or added lines in 3 files covered. (52.63%)

87 existing lines in 5 files now uncovered.

17703 of 18927 relevant lines covered (93.53%)

0.94 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

65.7
/pybossa/cloud_store_api/s3.py
1
import io
1✔
2
import os
1✔
3
import re
1✔
4
from tempfile import NamedTemporaryFile
1✔
5
from urllib.parse import urlparse
1✔
6
from botocore.exceptions import ClientError
1✔
7
from flask import current_app as app
1✔
8
from werkzeug.utils import secure_filename
1✔
9
import magic
1✔
10
from werkzeug.exceptions import BadRequest
1✔
11
from pybossa.cloud_store_api.connection import create_connection
1✔
12
from pybossa.encryption import AESWithGCM
1✔
13
import json
1✔
14
import time
1✔
15

16

17
allowed_mime_types = ['application/pdf',
1✔
18
                      'text/csv',
19
                      'text/richtext',
20
                      'text/tab-separated-values',
21
                      'text/xml',
22
                      'text/plain',
23
                      'application/oda',
24
                      'text/html',
25
                      'application/xml',
26
                      'image/jpeg',
27
                      'image/png',
28
                      'image/bmp',
29
                      'image/x-ms-bmp',
30
                      'image/gif',
31
                      'application/zip',
32
                      'application/vnd.ms-excel',
33
                      'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
34
                      'audio/mpeg',
35
                      'audio/wav',
36
                      'application/json',
37
                      'application/csv']
38

39

40
DEFAULT_CONN = 'S3_DEFAULT'
1✔
41

42

43
def check_type(filename):
1✔
44
    mime_type = magic.from_file(filename, mime=True)
1✔
45
    if mime_type not in allowed_mime_types:
1✔
46
        raise BadRequest('File type not supported for {}: {}'.format(filename, mime_type))
1✔
47

48

49
def validate_directory(directory_name):
1✔
50
    invalid_chars = '[^\w\/]'
1✔
51
    if re.search(invalid_chars, directory_name):
1✔
52
        raise RuntimeError('Invalid character in directory name')
1✔
53

54

55
def tmp_file_from_string(string):
1✔
56
    """
57
    Create a temporary file with the given content
58
    """
59
    tmp_file = NamedTemporaryFile(delete=False)
1✔
UNCOV
60
    try:
×
UNCOV
61
        with io.open(tmp_file.name, 'w', encoding='utf8') as fp:
×
UNCOV
62
            fp.write(string)
×
63
    except Exception as e:
×
64
        os.unlink(tmp_file.name)
×
65
        raise e
×
UNCOV
66
    return tmp_file
×
67

68

69
def s3_upload_from_string(s3_bucket, string, filename, headers=None,
1✔
70
                          directory='', file_type_check=True,
71
                          return_key_only=False, conn_name=DEFAULT_CONN,
72
                          with_encryption=False, upload_root_dir=None):
73
    """
74
    Upload a string to s3
75
    """
76
    tmp_file = tmp_file_from_string(string)
1✔
UNCOV
77
    headers = headers or {}
×
UNCOV
78
    return s3_upload_tmp_file(
×
79
            s3_bucket, tmp_file, filename, headers, directory, file_type_check,
80
            return_key_only, conn_name, with_encryption, upload_root_dir)
81

82

83
def s3_upload_file_storage(s3_bucket, source_file, headers=None, directory='',
1✔
84
                           file_type_check=True, return_key_only=False,
85
                           conn_name=DEFAULT_CONN, with_encryption=False):
86
    """
87
    Upload a werzkeug FileStorage content to s3
88
    The FileStorage content can only be BytesIO
89
    """
UNCOV
90
    filename = source_file.filename
×
UNCOV
91
    headers = headers or {}
×
UNCOV
92
    headers['Content-Type'] = source_file.content_type
×
93

UNCOV
94
    tmp_file = NamedTemporaryFile(delete=False)
×
95

96
    # When using the file name (tmp_file.name), save method in the FileStorage
97
    # class can only open the file in binary mode
UNCOV
98
    source_file.save(tmp_file.name)
×
UNCOV
99
    tmp_file.flush()
×
100

UNCOV
101
    upload_root_dir = app.config.get('S3_UPLOAD_DIRECTORY')
×
UNCOV
102
    return s3_upload_tmp_file(
×
103
            s3_bucket, tmp_file, filename, headers, directory, file_type_check,
104
            return_key_only, conn_name, with_encryption, upload_root_dir)
105

106

107
def s3_upload_tmp_file(s3_bucket, tmp_file, filename,
1✔
108
                       headers, directory='', file_type_check=True,
109
                       return_key_only=False, conn_name=DEFAULT_CONN,
110
                       with_encryption=False,
111
                       upload_root_dir=None):
112
    """
113
    Upload the content of a temporary file to s3 and delete the file
114
    """
UNCOV
115
    try:
×
UNCOV
116
        if file_type_check:
×
UNCOV
117
            check_type(tmp_file.name)
×
UNCOV
118
        content = tmp_file.read()
×
UNCOV
119
        if with_encryption:
×
UNCOV
120
            secret = app.config.get('FILE_ENCRYPTION_KEY')
×
UNCOV
121
            cipher = AESWithGCM(secret)
×
UNCOV
122
            content = cipher.encrypt(content)
×
123

124
        # make sure content is a bytes string
UNCOV
125
        if type(content) == str:
×
UNCOV
126
            content = content.encode()
×
UNCOV
127
        fp = io.BytesIO(content)  # BytesIO accepts bytes string
×
UNCOV
128
        url = s3_upload_file(s3_bucket, fp, filename, headers, upload_root_dir,
×
129
                             directory, return_key_only, conn_name)
UNCOV
130
        bcosv2_prod_util_url = app.config.get('BCOSV2_PROD_UTIL_URL')
×
131

132
        # generate url path to be stored as metadata
133
        # which can be different from actual uploaded url
134
        # and is based upon the type of uploaded url path
UNCOV
135
        meta_url = url
×
UNCOV
136
        if bcosv2_prod_util_url and url.startswith(bcosv2_prod_util_url):
×
UNCOV
137
            meta_url = url.replace("-util", "")
×
UNCOV
138
            app.logger.info("bcosv2 url paths. uploaded path %s, metadata path %s", url, meta_url)
×
139

140
    finally:
UNCOV
141
        os.unlink(tmp_file.name)
×
UNCOV
142
    return meta_url
×
143

144

145
def form_upload_directory(directory, filename, upload_root_dir):
1✔
UNCOV
146
    validate_directory(directory)
×
UNCOV
147
    parts = [upload_root_dir, directory, filename]
×
UNCOV
148
    return "/".join(part for part in parts if part)
×
149

150

151
def s3_upload_file(s3_bucket, source_file, target_file_name,
1✔
152
                   headers, upload_root_dir, directory="",
153
                   return_key_only=False, conn_name=DEFAULT_CONN):
154
    """
155
    Upload a file-type object to S3
156
    :param s3_bucket: AWS S3 bucket name
157
    :param source_file_name: name in local file system of the file to upload
158
    :param target_file_name: file name as should appear in S3
159
    :param headers: a dictionary of headers to set on the S3 object
160
    :param directory: path in S3 where the object needs to be stored
161
    :param return_key_only: return key name instead of full url
162
    """
UNCOV
163
    filename = secure_filename(target_file_name)
×
UNCOV
164
    upload_key = form_upload_directory(directory, filename, upload_root_dir)
×
UNCOV
165
    conn_kwargs = app.config.get(conn_name, {})
×
UNCOV
166
    conn = create_connection(**conn_kwargs)
×
UNCOV
167
    bucket = conn.get_bucket(s3_bucket, validate=False)
×
168

UNCOV
169
    assert(len(upload_key) < 256)
×
UNCOV
170
    key = bucket.new_key(upload_key)
×
171

UNCOV
172
    key.set_contents_from_file(
×
173
        source_file, headers=headers,
174
        policy='bucket-owner-full-control')
175

UNCOV
176
    if return_key_only:
×
UNCOV
177
        return key.name
×
UNCOV
178
    url = key.generate_url(0, query_auth=False)
×
UNCOV
179
    return url.split('?')[0]
×
180

181

182
def get_s3_bucket_key(s3_bucket, s3_url, conn_name=DEFAULT_CONN):
1✔
183
    conn_kwargs = app.config.get(conn_name, {})
1✔
184
    conn = create_connection(**conn_kwargs)
1✔
185
    bucket = conn.get_bucket(s3_bucket, validate=False)
1✔
186
    obj = urlparse(s3_url)
1✔
187
    path = obj.path
1✔
188
    key = bucket.get_key(path, validate=False)
1✔
189
    return bucket, key
1✔
190

191

192
def get_file_from_s3(s3_bucket, path, conn_name=DEFAULT_CONN, decrypt=False):
1✔
193
    content = get_content_from_s3(s3_bucket, path, conn_name, decrypt)
1✔
194
    temp_file = NamedTemporaryFile()
1✔
195
    if type(content) == str:
1✔
196
        content = content.encode()
1✔
197
    temp_file.write(content)
1✔
198
    temp_file.seek(0)
1✔
199
    return temp_file
1✔
200

201

202
def get_content_and_key_from_s3(s3_bucket, path, conn_name=DEFAULT_CONN,
1✔
203
        decrypt=False, secret=None):
204
    _, key = get_s3_bucket_key(s3_bucket, path, conn_name)
1✔
205
    content = key.get_contents_as_string()
1✔
206
    file_path = f"{s3_bucket}/{path}"
1✔
207
    if decrypt:
1✔
208
        if not secret:
1✔
209
            secret = app.config.get('FILE_ENCRYPTION_KEY')
1✔
210
        cipher = AESWithGCM(secret)
1✔
211
        content = cipher.decrypt(content)
1✔
212
    try:
1✔
213
        if type(content) == bytes:
1✔
214
            content = content.decode()
×
215
            app.logger.info("get_content_and_key_from_s3. contents decoded")
×
216
    except (UnicodeDecodeError, AttributeError) as e:
×
217
        app.logger.info("get_content_and_key_from_s3. file %s exception %s", file_path, str(e))
×
218
        pass
×
219
    return content, key
1✔
220

221

222
def get_content_from_s3(s3_bucket, path, conn_name=DEFAULT_CONN, decrypt=False):
1✔
223
    return get_content_and_key_from_s3(s3_bucket, path, conn_name, decrypt)[0]
1✔
224

225

226
def delete_file_from_s3(s3_bucket, s3_url, conn_name=DEFAULT_CONN):
1✔
227
    headers = {}
1✔
228
    try:
1✔
229
        bucket, key = get_s3_bucket_key(s3_bucket, s3_url, conn_name)
1✔
UNCOV
230
        bucket.delete_key(key.name, version_id=key.version_id, headers=headers)
×
231
    except ClientError as e:
1✔
232
        app.logger.exception('S3: unable to delete file {0}'.format(s3_url))
1✔
233

234

235
def upload_json_data(json_data, upload_path, file_name, encryption,
1✔
236
    conn_name, upload_root_dir=None, bucket=None):
237
    content = json.dumps(json_data, ensure_ascii=False)
1✔
238
    if not bucket:
1✔
UNCOV
239
        bucket = app.config.get("S3_BUCKET_V2") if app.config.get("S3_CONN_TYPE_V2") else app.config.get("S3_BUCKET")
×
240

241
    return s3_upload_from_string(bucket, content, file_name, file_type_check=False,
1✔
242
        directory=upload_path, conn_name=conn_name,
243
        with_encryption=encryption, upload_root_dir=upload_root_dir)
244

245

246
def upload_email_attachment(content, filename, user_email, project_id=None):
1✔
247
    """Upload file to storage location and generate url to download file later"""
248

249
    # generate signature for authorised access to the attachment
250
    from pybossa.core import signer
1✔
251
    from pybossa.core import sentinel
1✔
252
    from pybossa.redis_lock import register_user_exported_report
1✔
253
    from pybossa.cache.users import get_user_by_email
1✔
254

255
    payload = {"project_id": project_id} if project_id else {}
1✔
256
    payload["user_email"] = user_email
1✔
257
    signature = signer.dumps(payload)
1✔
258

259
    # upload contents to s3 storage
260
    bucket_name = app.config.get("S3_REQUEST_BUCKET_V2")
1✔
261
    conn_name = "S3_TASK_REQUEST_V2"
1✔
262
    if not bucket_name:
1✔
263
        raise RuntimeError("S3_REQUEST_BUCKET_V2 is not configured")
×
264

265
    conn_kwargs = app.config.get(conn_name, {})
1✔
266
    conn = create_connection(**conn_kwargs)
1✔
267
    bucket = conn.get_bucket(bucket_name, validate=False)
1✔
268

269
    # Generate a unique file path using UTC timestamp and secure filename
270
    timestamp = int(time.time())
1✔
271
    secure_file_name = secure_filename(filename)
1✔
272
    s3_path = f"attachments/{timestamp}-{secure_file_name}"
1✔
273
    app.logger.info("upload email attachment s3 path %s", s3_path)
1✔
274

275
    # Upload content to S3
276
    key = bucket.new_key(s3_path)
1✔
277
    key.set_contents_from_string(content)
1✔
278
    server_url = app.config.get('SERVER_URL')
1✔
279
    url = f"{server_url}/attachment/{signature}/{timestamp}-{secure_file_name}"
1✔
280
    app.logger.info("upload email attachment url %s", url)
1✔
281
    user_id = get_user_by_email(user_email).id
1✔
282
    cache_info = register_user_exported_report(user_id, url, sentinel.master)
1✔
283
    app.logger.info("Cache updated for exported report %s", cache_info)
1✔
284
    return url
1✔
285

286

287
def s3_get_email_attachment(path):
1✔
288
    """Download email attachment from storage location"""
289

290
    response = {
1✔
291
        "name": "",
292
        "type": "application/octet-stream",
293
        "content": b""
294
    }
295

296
    bucket = app.config.get("S3_REQUEST_BUCKET_V2")
1✔
297
    if not bucket:
1✔
298
        return response
1✔
299

300
    conn_name = "S3_TASK_REQUEST_V2"
1✔
301
    s3_path = f"attachments/{path}"
1✔
302
    content, key = get_content_and_key_from_s3(s3_bucket=bucket, path=s3_path, conn_name=conn_name)
1✔
303
    if content and key:
1✔
304
        app.logger.info("email attachment path %s, s3 file path %s, key name %s, key content_type %s",
1✔
305
                path, s3_path, key.name, key.content_type)
306
        response["name"] = key.name
1✔
307
        response["type"] = key.content_type
1✔
308
        response["content"] = content
1✔
309
    return response
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc