• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

bloomberg / pybossa / 11294393823

11 Oct 2024 02:42PM UTC coverage: 94.022% (+0.007%) from 94.015%
11294393823

Pull #982

github

dchhabda
delete task logs
Pull Request #982: delete task logs

9 of 9 new or added lines in 1 file covered. (100.0%)

56 existing lines in 2 files now uncovered.

17379 of 18484 relevant lines covered (94.02%)

0.94 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.48
/pybossa/util.py
1
# -*- coding: utf8 -*-
2
# This file is part of PYBOSSA.
3
#
4
# Copyright (C) 2015 Scifabric LTD.
5
#
6
# PYBOSSA is free software: you can redistribute it and/or modify
7
# it under the terms of the GNU Affero General Public License as published by
8
# the Free Software Foundation, either version 3 of the License, or
9
# (at your option) any later version.
10
#
11
# PYBOSSA is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
# GNU Affero General Public License for more details.
15
#
16
# You should have received a copy of the GNU Affero General Public License
17
# along with PYBOSSA.  If not, see <http://www.gnu.org/licenses/>.
18
"""Module with PyBossa utils."""
1✔
19
import base64
1✔
20
import codecs
1✔
21
import csv
1✔
22
import hashlib
1✔
23
import hmac
1✔
24
import io
1✔
25
import json
1✔
26
import os
1✔
27
import random
1✔
28
import re
1✔
29
import time
1✔
30
from collections import OrderedDict
1✔
31
from datetime import timedelta, datetime, date
1✔
32
from enum import Enum
1✔
33
from functools import update_wrapper
1✔
34
from functools import wraps
1✔
35
from math import ceil
1✔
36
from tempfile import NamedTemporaryFile
1✔
37

38
from pybossa import app_settings
1✔
39

40
import dateutil.parser
1✔
41
import dateutil.tz
1✔
42
import simplejson
1✔
43
from flask import abort, request, make_response, current_app, url_for
1✔
44
from flask import redirect, render_template, jsonify, get_flashed_messages
1✔
45
from flask_babel import lazy_gettext
1✔
46
from flask_login import current_user
1✔
47
# Markdown support
48
from flask_misaka import Misaka
1✔
49
from flask_wtf import FlaskForm as Form
1✔
50
from flask_wtf.csrf import generate_csrf
1✔
51
from sqlalchemy import text
1✔
52
from sqlalchemy.exc import ProgrammingError
1✔
53
from yacryptopan import CryptoPAn
1✔
54

55
from pybossa.cloud_store_api.connection import create_connection
1✔
56
from pybossa.cloud_store_api.s3 import get_file_from_s3, delete_file_from_s3
1✔
57
from pybossa.cloud_store_api.s3 import s3_upload_file_storage
1✔
58

59
from bs4 import BeautifulSoup
1✔
60

61
misaka = Misaka()
1✔
62
TP_COMPONENT_TAGS = ["text-input", "dropdown-input", "radio-group-input",
1✔
63
                     "checkbox-input", "multi-select-input", "input-text-area", "text-tagging"]
64

65
PARTIAL_ANSWER_PREFIX = "partial_answer:project:{project_id}:user:{user_id}"
1✔
66
PARTIAL_ANSWER_KEY = PARTIAL_ANSWER_PREFIX + ":task:{task_id}"
1✔
67
PARTIAL_ANSWER_POSITION_KEY = PARTIAL_ANSWER_PREFIX + ":position"
1✔
68

69
class SavedTaskPositionEnum(str, Enum):
1✔
70
    FIRST = "first"
1✔
71
    LAST = "last"
1✔
72

73
def last_flashed_message():
1✔
74
    """Return last flashed message by flask."""
75
    messages = get_flashed_messages(with_categories=True)
1✔
76
    if len(messages) > 0:
1✔
77
        return messages[-1]
1✔
78
    else:
79
        return None
1✔
80

81

82
def form_to_json(form):
1✔
83
    """Return a form in JSON format."""
84
    tmp = form.data
1✔
85
    tmp['errors'] = form.errors
1✔
86
    tmp['csrf'] = generate_csrf()
1✔
87
    return tmp
1✔
88

89
def user_to_json(user):
1✔
90
    """Return a user in JSON format."""
91
    return user.dictize()
1✔
92

93
def hash_last_flash_message():
1✔
94
    """Base64 encode the last flash message"""
95
    data = {}
1✔
96
    message_and_status = last_flashed_message()
1✔
97
    if message_and_status:
1✔
98
        data['flash'] = message_and_status[1]
1✔
99
        data['status'] = message_and_status[0]
1✔
100
    json_data = json.dumps(data)
1✔
101
    return base64.b64encode(json_data.encode())  # b64encode accepts bytes only
1✔
102

103
def handle_content_type(data):
1✔
104
    """Return HTML or JSON based on request type."""
105
    if (request.headers.get('Content-Type') == 'application/json' or
1✔
106
        request.args.get('response_format') == 'json'):
107
        message_and_status = last_flashed_message()
1✔
108
        if message_and_status:
1✔
109
            data['flash'] = message_and_status[1]
1✔
110
            data['status'] = message_and_status[0]
1✔
111
        for item in data.keys():
1✔
112
            if isinstance(data[item], Form):
1✔
113
                data[item] = form_to_json(data[item])
1✔
114
            if isinstance(data[item], Pagination):
1✔
115
                data[item] = data[item].to_json()
1✔
116
            if (item == 'announcements'):
1✔
117
                data[item] = [announcement.to_public_json() for announcement in data[item]]
1✔
118
            if (item == 'blogposts'):
1✔
119
                data[item] = [blog.to_public_json() for blog in data[item]]
1✔
120
            if (item == 'categories'):
1✔
121
                tmp = []
1✔
122
                for cat in data[item]:
1✔
123
                    if type(cat) != dict:
1✔
124
                        cat = cat.to_public_json()
1✔
125
                    tmp.append(cat)
1✔
126
                data[item] = tmp
1✔
127
            if (item == 'active_cat'):
1✔
128
                if type(data[item]) != dict:
1✔
129
                    cat = data[item].to_public_json()
1✔
130
                data[item] = cat
1✔
131
            if (item == 'users') and type(data[item]) != str:
1✔
132
                data[item] = [user_to_json(user) for user in data[item]]
1✔
133
            if (item == 'users' or item == 'projects' or item == 'tasks' or item == 'locs') and type(data[item]) == str:
1✔
134
                data[item] = json.loads(data[item])
1✔
135
            if (item == 'found') and isinstance(data[item], list) :
1✔
136
                if len(data[item]) and not isinstance(data[item][0], dict):
1✔
137
                    data[item] = [user_to_json(user) for user in data[item]]
1✔
138
            if (item == 'category'):
1✔
139
                data[item] = data[item].to_public_json()
1✔
140
        if 'code' in data.keys():
1✔
141
            return jsonify(data), data['code']
1✔
142
        else:
143
            return jsonify(data)
1✔
144
    else:
145
        template = data['template']
1✔
146
        del data['template']
1✔
147
        if 'code' in data.keys():
1✔
148
            error_code = data['code']
1✔
149
            del data['code']
1✔
150
            return render_template(template, **data), error_code
1✔
151
        else:
152
            return render_template(template, **data)
1✔
153

154
def is_own_url(url):
1✔
155
    from urllib.parse import urlparse
1✔
156
    domain = urlparse(url).netloc
1✔
157
    return (not domain) or domain.startswith(current_app.config.get('SERVER_NAME', '-'))
1✔
158

159
def is_own_url_or_else(url, default):
1✔
160
    return url if is_own_url(url) else default
1✔
161

162
def redirect_content_type(url, status=None):
1✔
163
    data = dict(next=url)
1✔
164
    if status is not None:
1✔
165
        data['status'] = status
1✔
166
    if (request.headers.get('Content-Type') == 'application/json' or
1✔
167
        request.args.get('response_format') == 'json'):
168
        return handle_content_type(data)
1✔
169
    else:
170
        return redirect(url)
1✔
171

172
def static_vars(**kwargs):
1✔
173
    def decorate(func):
1✔
174
        for k in kwargs:
1✔
175
            setattr(func, k, kwargs[k])
1✔
176
        return func
1✔
177
    return decorate
1✔
178

179
def url_for_app_type(endpoint, _hash_last_flash=False, **values):
1✔
180
    """Generate a URL for an SPA, or otherwise."""
181
    spa_server_name = current_app.config.get('SPA_SERVER_NAME')
1✔
182
    if spa_server_name:
1✔
183
      values.pop('_external', None)
1✔
184
      if _hash_last_flash:
1✔
185
          values['flash'] = hash_last_flash_message()
1✔
186
          return spa_server_name + url_for(endpoint, **values)
1✔
187
      return spa_server_name + url_for(endpoint, **values)
1✔
188
    return url_for(endpoint, **values)
1✔
189

190

191
def jsonpify(f):
1✔
192
    """Wrap JSONified output for JSONP."""
193
    @wraps(f)
1✔
194
    def decorated_function(*args, **kwargs):
1✔
195
        callback = request.args.get('callback', False)
1✔
196
        if callback:
1✔
197
            content = str(callback) + '(' + str(f(*args, **kwargs).data) + ')'
1✔
198
            return current_app.response_class(content,
1✔
199
                                              mimetype='application/javascript')
200
        else:
201
            return f(*args, **kwargs)
1✔
202
    return decorated_function
1✔
203

204

205
def admin_required(f):  # pragma: no cover
206
    """Check if the user is admin or not."""
207
    @wraps(f)
208
    def decorated_function(*args, **kwargs):
209
        if current_user.admin:
210
            return f(*args, **kwargs)
211
        else:
212
            return abort(403)
213
    return decorated_function
214

215

216
def admin_or_subadmin_required(f):  # pragma: no cover
217
    """Check if the user is admin or subadmin"""
218
    @wraps(f)
219
    def decorated_function(*args, **kwargs):
220
        if current_user.admin or current_user.subadmin:
221
            return f(*args, **kwargs)
222
        else:
223
            return abort(403)
224
    return decorated_function
225

226

227
# from http://flask.pocoo.org/snippets/56/
228
def crossdomain(origin=None, methods=None, headers=None,
1✔
229
                max_age=21600, attach_to_all=True,
230
                automatic_options=True):
231
    """Crossdomain decorator."""
232
    if methods is not None:  # pragma: no cover
233
        methods = ', '.join(sorted(x.upper() for x in methods))
234
    if headers is not None and not isinstance(headers, str):
1✔
235
        headers = ', '.join(x.upper() for x in headers)
1✔
236
    if not isinstance(origin, str):  # pragma: no cover
237
        origin = ', '.join(origin)
238
    if isinstance(max_age, timedelta):  # pragma: no cover
239
        max_age = max_age.total_seconds()
240

241
    def get_methods():  # pragma: no cover
242
        if methods is not None:
243
            return methods
244

245
        options_resp = current_app.make_default_options_response()
246
        return options_resp.headers['allow']
247

248
    def decorator(f):
1✔
249

250
        def wrapped_function(*args, **kwargs):
1✔
251
            if automatic_options and request.method == 'OPTIONS':  # pragma: no cover
252
                resp = current_app.make_default_options_response()
253
            else:
254
                resp = make_response(f(*args, **kwargs))
1✔
255
            if not attach_to_all and request.method != 'OPTIONS':  # pragma: no cover
256
                return resp
257

258
            h = resp.headers
1✔
259

260
            h['Access-Control-Allow-Origin'] = origin
1✔
261
            h['Access-Control-Allow-Methods'] = get_methods()
1✔
262
            h['Access-Control-Max-Age'] = str(max_age)
1✔
263
            if headers is not None:
1✔
264
                h['Access-Control-Allow-Headers'] = headers
1✔
265
            return resp
1✔
266

267
        f.provide_automatic_options = False
1✔
268
        return update_wrapper(wrapped_function, f)
1✔
269
    return decorator
1✔
270

271

272
def parse_date_string(source):
1✔
273
    if not isinstance(source, (date, datetime)):
1✔
274
        try:
1✔
275
            return dateutil.parser.parse(str(source))
1✔
276
        except:
1✔
277
            return source
1✔
278

279
    return source
×
280

281

282
def convert_est_to_utc(source):
1✔
283
    source = parse_date_string(source)
1✔
284

285
    utc = dateutil.tz.gettz('UTC')
1✔
286
    est = dateutil.tz.gettz('America/New_York')
1✔
287

288
    # naive to EST to UTC
289
    return source.replace(tzinfo=est).astimezone(utc)
1✔
290

291

292
def convert_utc_to_est(source):
1✔
293
    source = parse_date_string(source)
1✔
294

295
    utc = dateutil.tz.gettz('UTC')
1✔
296
    est = dateutil.tz.gettz('America/New_York')
1✔
297

298
    # naive to UTC to EST
299
    return source.replace(tzinfo=utc).astimezone(est)
1✔
300

301

302
# From http://stackoverflow.com/q/1551382
303
def pretty_date(time=False):
1✔
304
    """Return a pretty date.
305

306
    Get a datetime object or a int() Epoch timestamp and return a
307
    pretty string like 'an hour ago', 'Yesterday', '3 months ago',
308
    'just now', etc.
309
    """
310
    now = datetime.now()
1✔
311
    if type(time) is str or type(time) is str:
1✔
312
        time = dateutil.parser.parse(time)
1✔
313
    if type(time) is int:
1✔
314
        diff = now - datetime.fromtimestamp(time)
1✔
315
    if type(time) is float:
1✔
316
        diff = now - datetime.fromtimestamp(time)
1✔
317
    elif isinstance(time, datetime):
1✔
318
        diff = now - time
1✔
319
    elif not time:
1✔
320
        diff = now - now
1✔
321
    second_diff = diff.seconds
1✔
322
    day_diff = diff.days
1✔
323

324
    if day_diff < 0:
1✔
325
        return ''
1✔
326

327
    if day_diff == 0:
1✔
328
        if second_diff < 10:
1✔
329
            return "just now"
1✔
330
        if second_diff < 60:
1✔
331
            return str(second_diff) + " seconds ago"
1✔
332
        if second_diff < 120:
1✔
333
            return "a minute ago"
1✔
334
        if second_diff < 3600:
1✔
335
            return ' '.join([str(second_diff // 60), "minutes ago"])
1✔
336
        if second_diff < 7200:
1✔
337
            return "an hour ago"
1✔
338
        if second_diff < 86400:
1✔
339
            return ' '.join([str(second_diff // 3600), "hours ago"])
1✔
340
    if day_diff == 1:
1✔
341
        return "Yesterday"
1✔
342
    if day_diff < 7:
1✔
343
        return ' '.join([str(day_diff), "days ago"])
1✔
344
    if day_diff < 31:
1✔
345
        return ' '.join([str(day_diff // 7), "weeks ago"])
1✔
346
    if day_diff < 60:
1✔
347
        return ' '.join([str(day_diff // 30), "month ago"])
1✔
348
    if day_diff < 365:
1✔
349
        return ' '.join([str(day_diff // 30), "months ago"])
1✔
350
    if day_diff < (365 * 2):
1✔
351
        return ' '.join([str(day_diff // 365), "year ago"])
1✔
352
    return ' '.join([str(day_diff // 365), "years ago"])
1✔
353

354

355
def datetime_filter(source, fmt):
1✔
356

357
    if not isinstance(source, (date, datetime)):
1✔
358
        try:
1✔
359
            source = datetime.strptime(str(source), "%Y-%m-%dT%H:%M:%S.%f")
1✔
360
        except Exception:
1✔
361
            return source
1✔
362

363
    utc = dateutil.tz.gettz('UTC')
1✔
364
    est = dateutil.tz.gettz('America/New_York')
1✔
365

366
    # naive to UTC to local
367
    source = source.replace(tzinfo=utc).astimezone(est)
1✔
368
    return source.strftime(fmt)
1✔
369

370

371
def validate_ownership_id(ownership_id):
1✔
372
    if ownership_id == None or len(ownership_id) == 0:
1✔
373
        return True
1✔
374
    return ownership_id.isnumeric() and len(ownership_id) <= 20
1✔
375

376
class Pagination(object):
1✔
377

378
    """Class to paginate domain objects."""
379

380
    def __init__(self, page, per_page, total_count):
1✔
381
        """Init method."""
382
        self.page = page
1✔
383
        self.per_page = per_page
1✔
384
        self.total_count = total_count
1✔
385

386
    @property
1✔
387
    def pages(self):
1✔
388
        """Return number of pages."""
389
        return int(ceil(self.total_count / float(self.per_page)))
1✔
390

391
    @property
1✔
392
    def has_prev(self):
1✔
393
        """Return if it has a previous page."""
394
        return self.page > 1 and self.page <= self.pages
1✔
395

396

397
    @property
1✔
398
    def has_next(self):
1✔
399
        """Return if it has a next page."""
400
        return self.page < self.pages
1✔
401

402
    def iter_pages(self, left_edge=0, left_current=2, right_current=3,
1✔
403
                   right_edge=0):
404
        """Iterate over pages."""
405
        last = 0
1✔
406
        for num in range(1, self.pages + 1):
1✔
407
            if (num <= left_edge or
1✔
408
                    (num > self.page - left_current - 1 and
409
                     num < self.page + right_current) or
410
                    num > self.pages - right_edge):
411
                if last + 1 != num:  # pragma: no cover
412
                    yield None
413
                yield num
1✔
414
                last = num
1✔
415

416
    def to_json(self):
1✔
417
        """Return the object in JSON format."""
418
        return dict(page=self.page,
1✔
419
                    per_page=self.per_page,
420
                    total=self.total_count,
421
                    next=self.has_next,
422
                    prev=self.has_prev)
423

424
    @property
1✔
425
    def curr_page_count(self):
1✔
426
        """Returns count on curr page"""
427
        if self.has_next:
1✔
428
            return self.per_page
1✔
429
        elif self.has_prev:
1✔
430
            curr_count = self.total_count % self.per_page
1✔
431
            return self.per_page if not curr_count else curr_count
1✔
432
        else:
433
            return 0
1✔
434

435

436
def unicode_csv_reader(unicode_csv_data, dialect=csv.excel, **kwargs):
1✔
437
    """Unicode CSV reader."""
438
    # TODO - consider using pandas to read csv in the future
439

440
    csv_reader = csv.reader(unicode_csv_data,
1✔
441
                            dialect=dialect, **kwargs)
442
    return csv_reader
1✔
443

444

445
def get_user_signup_method(user):
1✔
446
    """Return which OAuth sign up method the user used."""
447
    msg = 'Sorry, there is already an account with the same e-mail.'
1✔
448
    if user.info:
1✔
449
        # Google
450
        if user.info.get('google_token'):
1✔
451
            msg += " <strong>It seems like you signed up with your Google account.</strong>"
1✔
452
            msg += "<br/>You can try and sign in by clicking in the Google button."
1✔
453
            return (msg, 'google')
1✔
454
        # Facebook
455
        elif user.info.get('facebook_token'):
1✔
456
            msg += " <strong>It seems like you signed up with your Facebook account.</strong>"
1✔
457
            msg += "<br/>You can try and sign in by clicking in the Facebook button."
1✔
458
            return (msg, 'facebook')
1✔
459
        # Twitter
460
        elif user.info.get('twitter_token'):
1✔
461
            msg += " <strong>It seems like you signed up with your Twitter account.</strong>"
1✔
462
            msg += "<br/>You can try and sign in by clicking in the Twitter button."
1✔
463
            return (msg, 'twitter')
1✔
464
        # Local account
465
        else:
466
            msg += " <strong>It seems that you created an account locally.</strong>"
1✔
467
            msg += " <br/>You can reset your password if you don't remember it."
1✔
468
            return (msg, 'local')
1✔
469
    else:
470
        msg += " <strong>It seems that you created an account locally.</strong>"
1✔
471
        msg += " <br/>You can reset your password if you don't remember it."
1✔
472
        return (msg, 'local')
1✔
473

474

475
def get_port():
1✔
476
    """Get port."""
477
    import os
1✔
478
    port = os.environ.get('PORT', '')
1✔
479
    if port.isdigit():
1✔
480
        return int(port)
1✔
481
    else:
482
        return current_app.config['PORT']
1✔
483

484

485
def get_user_id_or_ip():
1✔
486
    """Return the id of the current user if is authenticated.
487
    Otherwise returns its IP address (defaults to 127.0.0.1).
488
    """
489
    cp = CryptoPAn(current_app.config.get('CRYPTOPAN_KEY').encode())
1✔
490
    user_id = current_user.id if current_user.is_authenticated else None
1✔
491
    user_ip = cp.anonymize(request.remote_addr or "127.0.0.1") \
1✔
492
        if current_user.is_anonymous else None
493
    external_uid = request.args.get('external_uid')
1✔
494
    return dict(user_id=user_id, user_ip=user_ip, external_uid=external_uid)
1✔
495

496

497
def with_cache_disabled(f):
1✔
498
    """Decorator that disables the cache for the execution of a function.
499
    It enables it back when the function call is done.
500
    """
501
    import os
1✔
502

503
    @wraps(f)
1✔
504
    def wrapper(*args, **kwargs):
1✔
505
        env_cache_disabled = os.environ.get('PYBOSSA_REDIS_CACHE_DISABLED')
1✔
506
        if env_cache_disabled is None or env_cache_disabled == '0':
1✔
507
            os.environ['PYBOSSA_REDIS_CACHE_DISABLED'] = '1'
1✔
508
        return_value = f(*args, **kwargs)
1✔
509
        if env_cache_disabled is None:
1✔
510
            del os.environ['PYBOSSA_REDIS_CACHE_DISABLED']
1✔
511
        else:
512
            os.environ['PYBOSSA_REDIS_CACHE_DISABLED'] = env_cache_disabled
1✔
513
        return return_value
1✔
514
    return wrapper
1✔
515

516

517
def is_reserved_name(blueprint, name):
1✔
518
    """Check if a name has already been registered inside a blueprint URL."""
519
    path = ''.join(['/', blueprint])
1✔
520
    app_urls = [r.rule for r in current_app.url_map.iter_rules()
1✔
521
                if r.rule.startswith(path)]
522
    reserved_names = [url.split('/')[2] for url in app_urls
1✔
523
                      if url.split('/')[2] != '']
524
    return name in reserved_names
1✔
525

526

527
def username_from_full_name(username):
1✔
528
    """Takes a username that may contain several words with capital letters and
529
    returns a single word username (string type), no spaces, all lowercases."""
530
    username = username.replace(' ', '').lower()
1✔
531
    username = username.encode('ascii', 'ignore').decode()
1✔
532
    return username
1✔
533

534

535
def rank(projects, order_by=None, desc=False):
1✔
536
    """By default, takes a list of (published) projects (as dicts) and orders
537
    them by activity, number of volunteers, number of tasks and other criteria.
538

539
    Alternatively ranks by order_by and desc.
540
    """
541
    def earned_points(project):
1✔
542
        points = 0
1✔
543
        if project['overall_progress'] != 100:
1✔
544
            points += 1000
1✔
545
        points += _points_by_interval(project['n_tasks'], weight=1)
1✔
546
        points += _points_by_interval(project['n_volunteers'], weight=2)
1✔
547
        points += _last_activity_points(project) * 10
1✔
548
        return points
1✔
549

550
    if order_by:
1✔
551
      projects.sort(key=lambda x: x[str(order_by)], reverse=desc)
1✔
552
    else:
553
      projects.sort(key=earned_points, reverse=True)
1✔
554
    return projects
1✔
555

556

557
def _last_activity_points(project):
1✔
558
    default = datetime(1970, 1, 1, 0, 0).strftime('%Y-%m-%dT%H:%M:%S')
1✔
559
    updated_datetime = (project.get('updated') or default)
1✔
560
    last_activity_datetime = (project.get('last_activity_raw') or default)
1✔
561
    updated_datetime = updated_datetime.split('.')[0]
1✔
562
    last_activity_datetime = last_activity_datetime.split('.')[0]
1✔
563
    updated = datetime.strptime(updated_datetime, '%Y-%m-%dT%H:%M:%S')
1✔
564
    last_activity = datetime.strptime(last_activity_datetime, '%Y-%m-%dT%H:%M:%S')
1✔
565
    most_recent = max(updated, last_activity)
1✔
566

567
    days_since_modified = (datetime.utcnow() - most_recent).days
1✔
568

569
    if days_since_modified < 1:
1✔
570
        return 50
1✔
571
    if days_since_modified < 2:
1✔
572
        return 20
1✔
573
    if days_since_modified < 3:
1✔
574
        return 10
1✔
575
    if days_since_modified < 4:
1✔
576
        return 5
1✔
577
    if days_since_modified > 15:
1✔
578
        return -200
1✔
579
    return 0
1✔
580

581

582
def _points_by_interval(value, weight=1):
1✔
583
    if value > 100:
1✔
584
        return 20 * weight
1✔
585
    if value > 50:
1✔
586
        return 15 * weight
1✔
587
    if value > 20:
1✔
588
        return 10 * weight
1✔
589
    if value > 10:
1✔
590
        return 5 * weight
1✔
591
    if value > 0:
1✔
592
        return 1 * weight
1✔
593
    return 0
1✔
594

595

596
def publish_channel(sentinel, project_short_name, data, type, private=True):
1✔
597
    """Publish in a channel some JSON data as a string."""
598
    if private:
1✔
599
        channel = "channel_%s_%s" % ("private", project_short_name)
1✔
600
    else:
601
        channel = "channel_%s_%s" % ("public", project_short_name)
1✔
602
    msg = dict(type=type, data=data)
1✔
603
    sentinel.master.publish(channel, json.dumps(msg))
1✔
604

605

606
# See https://github.com/flask-restful/flask-restful/issues/332#issuecomment-63155660
607
def fuzzyboolean(value):
1✔
608
    if type(value) == bool:
1✔
609
        return value
1✔
610

611
    if not value:
1✔
612
        raise ValueError("boolean type must be non-null")
1✔
613
    value = value.lower()
1✔
614
    if value in ('false', 'no', 'off', 'n', '0',):
1✔
615
        return False
1✔
616
    if value in ('true', 'yes', 'on', 'y', '1',):
1✔
617
        return True
1✔
618
    raise ValueError("Invalid literal for boolean(): {}".format(value))
1✔
619

620

621
def get_avatar_url(upload_method, avatar, container, external):
1✔
622
    """Return absolute URL for avatar."""
623
    upload_method = upload_method.lower()
1✔
624
    if upload_method in ['cloud']:
1✔
UNCOV
625
        return url_for(upload_method,
×
626
                       filename=avatar,
627
                       container=container)
628
    else:
629
        filename = container + '/' + avatar
1✔
630
        return url_for('uploads.uploaded_file',
1✔
631
                       filename=filename,
632
                       _scheme=current_app.config.get('PREFERRED_URL_SCHEME'),
633
                       _external=external)
634

635

636
def get_disqus_sso(user):  # pragma: no cover
637
    # create a JSON packet of our data attributes
638
    # return a script tag to insert the sso message."""
639
    message, timestamp, sig, pub_key = get_disqus_sso_payload(user)
640
    return """<script type="text/javascript">
641
    var disqus_config = function() {
642
        this.page.remote_auth_s3 = "%(message)s %(sig)s %(timestamp)s";
643
        this.page.api_key = "%(pub_key)s";
644
    }
645
    </script>""" % dict(
646
        message=message,
647
        timestamp=timestamp,
648
        sig=sig,
649
        pub_key=pub_key,
650
    )
651

652

653
def get_disqus_sso_payload(user):
1✔
654
    """Return remote_auth_s3 and api_key for user."""
655
    DISQUS_PUBLIC_KEY = current_app.config.get('DISQUS_PUBLIC_KEY')
1✔
656
    DISQUS_SECRET_KEY = current_app.config.get('DISQUS_SECRET_KEY')
1✔
657
    if DISQUS_PUBLIC_KEY and DISQUS_SECRET_KEY:
1✔
658
        if user:
1✔
659
            data = simplejson.dumps({
1✔
660
                'id': user.id,
661
                'username': user.name,
662
                'email': user.email_addr,
663
            })
664
        else:
665
            data = simplejson.dumps({})
1✔
666
        # encode the data to base64
667
        message = base64.b64encode(data.encode())  # b64encode accepts bytes only
1✔
668
        # generate a timestamp for signing the message
669
        timestamp = int(time.time())
1✔
670
        # generate our hmac signature
671

672
        # HMAC constructor accepts bytes parameters
673
        tmp = '{} {}'.format(message, timestamp).encode()
1✔
674
        sig = hmac.HMAC(DISQUS_SECRET_KEY.encode(), tmp,
1✔
675
                        hashlib.sha1).hexdigest()
676

677
        return message, timestamp, sig, DISQUS_PUBLIC_KEY
1✔
678
    else:
679
        return None, None, None, None
1✔
680

681

682
def exists_materialized_view(db, view):
1✔
683
    sql = text('''SELECT EXISTS (
1✔
684
                    SELECT relname
685
                    FROM pg_catalog.pg_class c JOIN pg_namespace n
686
                    ON n.oid = c.relnamespace
687
                    WHERE c.relkind = 'm'
688
                    AND n.nspname = current_schema()
689
                    AND c.relname = :view);''')
690
    results = db.slave_session.execute(sql, dict(view=view))
1✔
691
    for result in results:
1✔
692
        return result.exists
1✔
UNCOV
693
    return False
×
694

695

696
def refresh_materialized_view(db, view):
1✔
697
    try:
1✔
698
        sql = text('REFRESH MATERIALIZED VIEW CONCURRENTLY %s' % view)
1✔
699
        db.session.execute(sql)
1✔
700
        db.session.commit()
1✔
701
        return "Materialized view refreshed concurrently"
1✔
702
    except ProgrammingError:
1✔
703
        sql = text('REFRESH MATERIALIZED VIEW %s' % view)
1✔
704
        db.session.rollback()
1✔
705
        db.session.execute(sql)
1✔
706
        db.session.commit()
1✔
707
        return "Materialized view refreshed"
1✔
708

709

710
def generate_invitation_email_for_new_user(user, project_slugs=None):
1✔
711
    project_slugs = project_slugs or []
1✔
712
    is_qa = current_app.config.get('IS_QA')
1✔
713
    server_url = current_app.config.get('SERVER_URL')
1✔
714
    user_manual_label = current_app.config.get('USER_MANUAL_LABEL')
1✔
715
    user_manual_url = current_app.config.get('USER_MANUAL_URL')
1✔
716
    brand = current_app.config.get('BRAND')
1✔
717
    project_urls = []
1✔
718
    for project_slug in project_slugs:
1✔
UNCOV
719
        project_url = None if not project_slug else server_url + '/project/' + project_slug
×
UNCOV
720
        if project_url:
×
721
            project_urls.append(project_url)
×
722
    bcc = []
1✔
723
    if current_user.is_authenticated:
1✔
724
        bcc.append(current_user.email_addr)
1✔
725
    msg = dict(subject='New account with {}'.format(brand),
1✔
726
               recipients=[user['email_addr']],
727
               bcc=bcc)
728
    msg['body'] = render_template('/account/email/newaccount_invite.md',
1✔
729
                                  user=user, project_urls=project_urls,
730
                                  user_manual_label=user_manual_label,
731
                                  user_manual_url=user_manual_url,
732
                                  server_url=server_url, is_qa=is_qa)
733
    msg['html'] = render_template('/account/email/newaccount_invite.html',
1✔
734
                                  user=user, project_urls=project_urls,
735
                                  user_manual_label=user_manual_label,
736
                                  user_manual_url=user_manual_url,
737
                                  server_url=server_url, is_qa=is_qa)
738
    return msg
1✔
739

740

741
def generate_invitation_email_for_admins_subadmins(user, access_type):
1✔
742

743
    is_qa = current_app.config.get('IS_QA')
1✔
744
    server_url = current_app.config.get('SERVER_URL')
1✔
745
    admin_manual_label = current_app.config.get('ADMIN_MANUAL_LABEL')
1✔
746
    admin_manual_url = current_app.config.get('ADMIN_MANUAL_URL')
1✔
747
    brand = current_app.config.get('BRAND')
1✔
748
    msg = dict(subject='Account access update on {}'.format(brand),
1✔
749
               recipients=[user.email_addr],
750
               bcc=[current_user.email_addr])
751
    msg['body'] = render_template('/account/email/adminsubadmin_invite.md',
1✔
752
                                  username=user.fullname,
753
                                  access_type=access_type,
754
                                  admin_manual_label=admin_manual_label,
755
                                  admin_manual_url=admin_manual_url,
756
                                  server_url=server_url,
757
                                  is_qa=is_qa)
758
    msg['html'] = render_template('/account/email/adminsubadmin_invite.html',
1✔
759
                                  username=user.fullname,
760
                                  access_type=access_type,
761
                                  admin_manual_label=admin_manual_label,
762
                                  admin_manual_url=admin_manual_url,
763
                                  server_url=server_url,
764
                                  is_qa=is_qa)
765
    return msg
1✔
766

767
def generate_notification_email_for_admins(user, admins_emails, access_type):
1✔
768

769
    is_qa = current_app.config.get('IS_QA')
1✔
770
    server_url = current_app.config.get('SERVER_URL')
1✔
771
    brand = current_app.config.get('BRAND')
1✔
772

773
    subject = 'Admin permissions have been granted on {}'.format(brand)
1✔
774
    msg = dict(subject=subject,
1✔
775
               recipients=[user.email_addr],
776
               bcc=admins_emails)
777
    msg['body'] = render_template('/account/email/adminnotification.md',
1✔
778
                                  username=user.fullname,
779
                                  access_type=access_type,
780
                                  server_url=server_url,
781
                                  is_qa=is_qa)
782
    msg['html'] = render_template('/account/email/adminnotification.html',
1✔
783
                                  username=user.fullname,
784
                                  access_type=access_type,
785
                                  server_url=server_url,
786
                                  is_qa=is_qa)
787
    return msg
1✔
788

789

790
def generate_manage_user_email(user, operation):
1✔
791
    assert user
1✔
792
    assert operation in ['enable', 'disable']
1✔
793

794
    brand = current_app.config.get('BRAND')
1✔
795
    server_url = current_app.config.get('SERVER_URL')
1✔
796

797
    if operation == 'enable':
1✔
798
        msg_to = user.fullname
1✔
799
        msg_header = '{} Account Enabled'.format(brand)
1✔
800
        msg_text = 'Your account {0} with {1} at {2} has been enabled. '\
1✔
801
                   'You can now login with your account credentials.'\
802
                   .format(user.email_addr, brand, server_url)
803
        msg = dict(subject='Account update on {}'.format(brand),
1✔
804
                   recipients=[user.email_addr],
805
                   bcc=[current_user.email_addr])
806
    elif operation == 'disable':
1✔
807
        msg_to = current_user.fullname
1✔
808
        msg_header = '{} Account Disabled'.format(brand)
1✔
809
        msg_text = 'Account {0} with {1} at {2} has been disabled. '\
1✔
810
                   .format(user.email_addr, brand, server_url)
811
        msg = dict(subject='Account update on {}'.format(brand),
1✔
812
                   recipients=[current_user.email_addr])
813

814
    if current_app.config.get('IS_QA'):
1✔
UNCOV
815
        msg_header = msg_header + ' (QA Version)'
×
816

817
    msg['body'] = render_template('/account/email/manageuser.md',
1✔
818
                                  username=msg_to,
819
                                  msgHeader=msg_header,
820
                                  msgText=msg_text)
821
    msg['html'] = render_template('/account/email/manageuser.html',
1✔
822
                                  username=msg_to,
823
                                  msgHeader=msg_header,
824
                                  msgText=msg_text)
825
    return msg
1✔
826

827

828
def check_password_strength(
1✔
829
        password, min_len=8, max_len=15,
830
        uppercase=True, lowercase=True,
831
        numeric=True, special=True, message=""):
832
    """Check password strength, return True if passed.
833
    Otherwise return False with exact failure message.
834
    """
835

836
    required_chars = []
1✔
837
    if uppercase:
1✔
838
        required_chars.append(r'[A-Z]')
1✔
839
    if lowercase:
1✔
840
        required_chars.append(r'[a-z]')
1✔
841
    if numeric:
1✔
842
        required_chars.append(r'[0-9]')
1✔
843
    if special:
1✔
844
        required_chars.append(r'[!@$%^&*#]')
1✔
845

846
    pwd_len = len(password)
1✔
847
    if min_len and pwd_len < min_len:
1✔
848
        return False, lazy_gettext(
1✔
849
            'Password must have a minimum of {} characters'.format(min_len)
850
        )
851

852
    if max_len and pwd_len > max_len:
1✔
853
        return False, lazy_gettext(
1✔
854
            'Password must not exceed {} characters'.format(max_len)
855
        )
856

857
    valid = all(re.search(ch, password) for ch in required_chars)
1✔
858
    if not valid:
1✔
859
        return False, message
1✔
860
    else:
861
        return True, None
1✔
862

863

864
def sample(population, at_least, at_most):
1✔
865
    rnd = random.SystemRandom()
1✔
866
    n = rnd.randint(at_least, at_most + 1)
1✔
867
    return [rnd.choice(population) for x in range(n)]
1✔
868

869

870
def generate_password():
1✔
871
    used = []
1✔
872
    chars = list(range(ord('a'), ord('z') + 1))
1✔
873
    lowers = [chr(x) for x in chars]
1✔
874
    uppers = [x.upper() for x in lowers]
1✔
875
    digits = [str(x) for x in range(0, 10)]
1✔
876
    special = '!@$%^&*#'
1✔
877

878
    used.extend(sample(lowers, 3, 6))
1✔
879
    used.extend(sample(uppers, 3, 6))
1✔
880
    used.extend(sample(digits, 2, 4))
1✔
881
    used.extend(sample(special, 2, 4))
1✔
882
    rnd = random.SystemRandom()
1✔
883
    rnd.shuffle(used)
1✔
884
    return ''.join(used)
1✔
885

886

887
def get_s3_bucket_name(url):
1✔
888
    # for 'http://bucket.s3.amazonaws.com/
889
    found = re.search('^https?://([^.]+).s3.amazonaws.com', url)
1✔
890
    if found:
1✔
891
        return found.group(1)
1✔
892
    # for 'http://s3.amazonaws.com/bucket'
893
    found = re.search('^https?://s3.amazonaws.com/([^\/]+)', url)
1✔
894
    if found:
1✔
895
        return found.group(1)
1✔
896
    return None
1✔
897

898

899
def valid_or_no_s3_bucket(task_data):
1✔
900
    """ Returns False when task has s3 url and s3 bucket is not valid"""
901
    allowed_s3_buckets = current_app.config.get('ALLOWED_S3_BUCKETS')
1✔
902

903
    # with no bucket configured, do not performing bucket check (default)
904
    if allowed_s3_buckets is None:
1✔
905
        return True
1✔
906

907
    for v in task_data.values():
1✔
908
        if isinstance(v, str):
1✔
909
            bucket = get_s3_bucket_name(v)
1✔
910
            if bucket is not None and bucket not in allowed_s3_buckets:
1✔
911
                return False
1✔
912
    return True
1✔
913

914

915
def can_update_user_info(current_user, user_to_update):
1✔
916
    disable_fields = {'user_type': 'You must be an admin or subadmin to edit this.'}
1✔
917
    hidden_fields = {'profile': 'You must be admin or subadmin to view this'}
1✔
918
    # admin can update anyone
919
    if current_user.admin:
1✔
920
        return True, None, None
1✔
921
    # subadmin can update self and normal users
922
    if current_user.subadmin:
1✔
923
        return (current_user.id == user_to_update.id or
1✔
924
            not (user_to_update.admin or user_to_update.subadmin)), None, None
925
    # users without admin or subadmin priviledge cannot view 'profile' field or update 'user_type' field
926
    if current_user.id == user_to_update.id:
1✔
927
        return True, disable_fields, hidden_fields
1✔
928
    return False, None, None
1✔
929

930

931
def mail_with_enabled_users(message):
1✔
932
    from pybossa.core import user_repo
1✔
933

934
    if not message:
1✔
935
        return False
1✔
936

937
    recipients = message.get('recipients', [])
1✔
938
    bcc = message.get('bcc', [])
1✔
939
    if not recipients and not bcc:
1✔
940
        return False
1✔
941

942
    emails = recipients + bcc
1✔
943
    enabled_users = user_repo.get_enbled_users_by_email(emails)
1✔
944
    enabled_emails = [user.email_addr for user in enabled_users]
1✔
945
    recipients = [email for email in recipients if email in enabled_emails]
1✔
946
    bcc = [email for email in bcc if email in enabled_emails]
1✔
947
    if not recipients and not bcc:
1✔
948
        return False
1✔
949

950
    message['recipients'] = recipients or None
1✔
951
    message['bcc'] = bcc or None
1✔
952
    return True
1✔
953

954

955
def grant_access_with_api_key(secure_app):
1✔
956
    from pybossa.core import user_repo
1✔
957
    import pybossa.model as model
1✔
958
    from flask import _request_ctx_stack
1✔
959

960
    apikey = None
1✔
961
    if not secure_app:
1✔
962
        apikey = request.args.get('api_key', None)
1✔
963
    if 'Authorization' in request.headers:
1✔
964
        apikey = request.headers.get('Authorization')
1✔
965
    if apikey:
1✔
966
        user = user_repo.get_by(api_key=apikey)
1✔
967
        if user and user.enabled:
1✔
968
            user.last_login = model.make_timestamp()
1✔
969
            user_repo.update(user)
1✔
970
            _request_ctx_stack.top.user = user
1✔
971

972

973
def can_have_super_user_access(user):
1✔
974
    assert(user)
1✔
975
    wlist_admins = current_app.config.get('SUPERUSER_WHITELIST_EMAILS', None)
1✔
976
    if (wlist_admins and
1✔
977
        not any(re.search(wl, user.email_addr, re.IGNORECASE)
978
            for wl in wlist_admins)):
UNCOV
979
        user.admin = user.subadmin = False
×
UNCOV
980
        current_app.logger.info('User {} {} cannot have admin/subadmin access'.
×
981
            format(user.fullname, user.email_addr))
982
        return False
×
983
    return True
1✔
984

985

986
def s3_get_file_contents(s3_bucket, s3_path,
1✔
987
                         headers=None, encoding='utf-8', conn=''):
988
    """Get the conents of a file from S3.
989

990
    :param s3_bucket: AWS S3 bucket
991
    :param s3_path: Path to an S3 object
992
    :param headers: Additional headers to send
993
        in the request to S3
994
    :param encoding: The text encoding to use
995
    :return: File contents as a string with the
996
        specified encoding
997
    """
UNCOV
998
    conn = create_connection(**current_app.config.get(conn, {}))
×
UNCOV
999
    bucket = conn.get_bucket(s3_bucket, validate=False)
×
1000
    key = bucket.get_key(s3_path)
×
1001
    return key.get_contents_as_string(
×
1002
            headers=headers, encoding=encoding)
1003

1004

1005
def get_unique_user_preferences(user_prefs):
1✔
1006
    duser_prefs = set()
1✔
1007
    for user_pref in user_prefs:
1✔
1008
        for k, values in user_pref.items():
1✔
1009
            if isinstance(values, list):
1✔
1010
                for v in values:
1✔
1011
                    pref = '\'{}\''.format(json.dumps({k: [v]}))
1✔
1012
                    duser_prefs.add(pref)
1✔
1013
    return duser_prefs
1✔
1014

1015

1016
def get_user_pref_db_clause(user_pref, user_email=None):
1✔
1017
    # expand user preferences as per sql format for jsonb datatype
1018
    # single user preference with multiple value or
1019
    # multiple user preferences with single/multiple values
1020
    _valid = ((k, v) for k, v in user_pref.items() if isinstance(v, list))
1✔
1021
    user_prefs = [{k: [item]} for k, pref_list in _valid
1✔
1022
                  for item in pref_list]
1023
    assign_key = 'assign_user'
1✔
1024
    location_key = 'locations'
1✔
1025
    language_key = 'languages'
1✔
1026

1027
    if not user_prefs:
1✔
1028
        user_pref_sql = '''(task.user_pref IS NULL OR task.user_pref = \'{}\' )'''
1✔
1029
        if user_email:
1✔
1030
            email_sql = ''' OR (task.user_pref->\'{}\' IS NULL AND task.user_pref->\'{}\' IS NULL
1✔
1031
                    AND task.user_pref->\'{}\' IS NOT NULL AND task.user_pref @> :assign_user)
1032
                    '''.format(location_key, language_key, assign_key)
1033
    else:
1034
        sql = ('task.user_pref @> \'{}\''.format(json.dumps(up).lower())
1✔
1035
                   for up in user_prefs)
1036
        user_pref_sql = '''( (task.user_pref-> \'{}\' IS NULL AND task.user_pref-> \'{}\' IS NULL) OR ({}) )'''.format(location_key, language_key, ' OR '.join(sql))
1✔
1037
        if user_email:
1✔
1038
            email_sql = ''' AND (task.user_pref->\'{}\' IS NULL OR task.user_pref @> :assign_user)
1✔
1039
                    '''.format(assign_key)
1040

1041
    return user_pref_sql + email_sql if user_email else user_pref_sql
1✔
1042

1043

1044
def get_user_filter_db_clause(user_profile):
1✔
1045
    # expand task filter as per sql format and (partially) match user profiles
1046
    # still need further validation to filter good tasks out
1047
    sql = """task.worker_filter IS NULL OR task.worker_filter = '{}'""".format("{}")
1✔
1048
    if user_profile:
1✔
1049
        user_profile_keys = [str(key) for key in user_profile.keys()]
1✔
1050
        sql += """ OR task.worker_filter ?| ARRAY{}::text[]""".format(user_profile_keys)
1✔
1051
    return sql
1✔
1052

1053

1054
def is_int(s):
1✔
1055
    try:
1✔
1056
        value = float(str(s))
1✔
1057
        return value.is_integer() and -2147483648 <= value <= 2147483647
1✔
1058
    except:
1✔
1059
        return False
1✔
1060

1061

1062
def validate_required_fields(data):
1✔
1063
    invalid_fields = []
1✔
1064
    required_fields = current_app.config.get("TASK_REQUIRED_FIELDS", {})
1✔
1065
    data_items_lower = {k.lower():v for k,v in data.items()} if required_fields.items() else {}
1✔
1066
    for field_name, field_info in required_fields.items():
1✔
1067
        field_val = field_info.get('val')
1✔
1068
        check_val = field_info.get('check_val')
1✔
1069
        int_val = field_info.get('require_int')
1✔
1070
        import_data = data_items_lower.get(field_name.lower())
1✔
1071
        if not import_data or \
1✔
1072
            (check_val and import_data not in field_val) or \
1073
            (int_val and ('.' in str(import_data) or not is_int(import_data))):
1074
            invalid_fields.append(field_name)
1✔
1075
    return invalid_fields
1✔
1076

1077

1078
def get_file_path_for_import_csv(csv_file):
1✔
1079
    s3_bucket = current_app.config.get('S3_IMPORT_BUCKET')
1✔
1080
    container = 'user_{}'.format(current_user.id) if current_user else 'user'
1✔
1081
    if s3_bucket:
1✔
1082
        with_encryption = current_app.config.get('ENABLE_ENCRYPTION')
1✔
1083
        path = s3_upload_file_storage(s3_bucket, csv_file, directory=container,
1✔
1084
            file_type_check=False, return_key_only=True,
1085
            with_encryption=with_encryption, conn_name='S3_IMPORT')
1086
    else:
UNCOV
1087
        tmpfile = NamedTemporaryFile(delete=False)
×
UNCOV
1088
        path = tmpfile.name
×
1089
        csv_file.save(path)
×
1090
    return path
1✔
1091

1092

1093
def get_import_csv_file(path):
1✔
1094
    s3_bucket = current_app.config.get("S3_IMPORT_BUCKET")
1✔
1095
    if s3_bucket:
1✔
1096
        decrypt = current_app.config.get('ENABLE_ENCRYPTION')
1✔
1097
        return get_file_from_s3(s3_bucket, path, conn_name='S3_IMPORT',
1✔
1098
            decrypt=decrypt)
1099
    else:
UNCOV
1100
        return open(path)
×
1101

1102

1103
def delete_import_csv_file(path):
1✔
UNCOV
1104
    s3_bucket = current_app.config.get("S3_IMPORT_BUCKET")
×
UNCOV
1105
    if s3_bucket:
×
1106
        delete_file_from_s3(s3_bucket, path, conn_name='S3_IMPORT')
×
1107
    else:
1108
        os.remove(path)
×
1109

1110

1111
def sign_task(task):
1✔
1112
    if current_app.config.get('ENABLE_ENCRYPTION'):
1✔
1113
        from pybossa.core import signer
1✔
1114
        signature = signer.dumps({'task_id': task['id']})
1✔
1115
        task['signature'] = signature
1✔
1116

1117

1118
def get_time_plus_delta_ts(time, **kwargs):
1✔
1119
    if isinstance(time, str):
1✔
1120
        time = datetime.fromisoformat(time)
1✔
1121
    return (time + timedelta(**kwargs))
1✔
1122

1123
def get_taskrun_date_range_sql_clause_params(start_date, end_date):
1✔
1124
    """Generate date cause and sql params for queriying db."""
1125
    sql_params = {}
1✔
1126
    date_clause = ""
1✔
1127
    if start_date:
1✔
1128
        date_clause = " AND task_run.finish_time >=:start_date"
1✔
1129
        sql_params['start_date'] = start_date
1✔
1130
    if end_date:
1✔
1131
        date_clause += " AND task_run.finish_time <=:end_date"
1✔
1132
        sql_params['end_date'] = end_date
1✔
1133
    return date_clause, sql_params
1✔
1134

1135

1136
def description_from_long_description(desc, long_desc):
1✔
1137
    """If description, return, else get from long description"""
1138
    if desc:
1✔
1139
        return desc
1✔
1140
    html_long_desc = misaka.render(long_desc)[:-1]
1✔
1141
    remove_html_tags_regex = re.compile('<[^>]*>')
1✔
1142
    blank_space_regex = re.compile('\n')
1✔
1143
    text_desc = remove_html_tags_regex.sub("", html_long_desc)[:255]
1✔
1144
    if len(text_desc) >= 252:
1✔
1145
        text_desc = text_desc[:-3]
1✔
1146
        text_desc += "..."
1✔
1147
    description = blank_space_regex.sub(" ", text_desc)
1✔
1148
    return description if description else " "
1✔
1149

1150

1151
def generate_bsso_account_notification(user):
1✔
1152
    warning = False if user.get('metadata', {}).get('user_type') else True
1✔
1153
    template_type = 'adminbssowarning' if warning else 'adminbssonotification'
1✔
1154
    admins_email_list = current_app.config.get('ALERT_LIST',[])
1✔
1155

1156
    template = '/account/email/{}'.format(template_type)
1✔
1157

1158
    is_qa = current_app.config.get('IS_QA')
1✔
1159
    server_url = current_app.config.get('SERVER_URL')
1✔
1160
    brand = current_app.config.get('BRAND')
1✔
1161

1162
    warning_msg = ' (missing user type)' if warning else ''
1✔
1163
    subject = 'A new account has been created via BSSO for {}{}'.format(brand, warning_msg)
1✔
1164
    msg = dict(subject=subject,
1✔
1165
               recipients=admins_email_list)
1166

1167
    fullname = user.get('fullname') or user['firstName'][0] + " " + user['lastName'][0]
1✔
1168

1169
    msg['body'] = render_template('{}.md'.format(template),
1✔
1170
                                  username=fullname,
1171
                                  server_url=server_url,
1172
                                  is_qa=is_qa)
1173
    msg['html'] = render_template('{}.html'.format(template),
1✔
1174
                                  username=fullname,
1175
                                  server_url=server_url,
1176
                                  is_qa=is_qa)
1177
    return msg
1✔
1178

1179

1180
def check_annex_response(response_value):
1✔
1181
    """
1182
    Recursively check whether a response is an odfoa response.
1183
    If it is a valid odfoa response, return odfoa response data. Otherwise
1184
    return None
1185
    """
1186
    if type(response_value) is not dict:
1✔
1187
        return None
1✔
1188

1189
    if all(k in response_value for k in {"version", "source-uri", "odf", "oa"}):
1✔
1190
        return response_value
1✔
1191

1192
    for key in response_value:
1✔
1193
        response = check_annex_response(response_value[key])
1✔
1194
        if response:
1✔
1195
            return response
1✔
1196

1197
    return None
1✔
1198

1199

1200
def process_annex_load(tp_code, response_value):
1✔
1201
    """
1202
    Process the tp_code so that after annex document is loaded, it also loads
1203
    annotations from the response_value
1204
    """
1205
    odfoa = json.dumps(response_value)
1✔
1206

1207
    # Looking for loadDocument() or loadDocumentLite() code snippet
1208
    regex = r"(\w+)\.(loadDocument|loadDocumentLite)\s*\(\s*.*\s*(\))"
1✔
1209
    matches = re.finditer(regex, tp_code)
1✔
1210
    count = 0
1✔
1211
    for match in matches:  # matching docx Annex code
1✔
1212
        annex_tab = match[1]  # the first group: (\w+)
1✔
1213
        code_to_append = f".then(() => {annex_tab}.loadAnnotationFromJson('{odfoa}'))"
1✔
1214
        right_parenthesis_end = match.end(3) + len(code_to_append) * count  # the 3rd group: (\)) - exclusive
1✔
1215
        tp_code = tp_code[:right_parenthesis_end] + code_to_append + tp_code[right_parenthesis_end:]
1✔
1216
        count += 1
1✔
1217

1218
    # Looking for code snippet like shell = document.getElementById("annex-viewer"); or
1219
    # shell = document.getElementById("shell-container");
1220
    regex = r"(\w+)\s*=\s*document\.getElementById\s*\(\s*('|\")?(annex-viewer|shell-container)('|\")?\s*\)\s*;"
1✔
1221
    matches = re.finditer(regex, tp_code)
1✔
1222
    count = 0
1✔
1223
    for match in matches:
1✔
1224
        shell = match[1]
1✔
1225
        code_to_append = (
1✔
1226
            f"\n"
1227
            f"    {shell}.addEventListener('urn:bloomberg:annex:event:instance-success', () => {{\n"
1228
            f"      {shell}.internalModel.activeTab().loadAnnotationFromJson('{odfoa}')\n"
1229
            f"    }});"
1230
        )
1231
        semi_colon_end = match.end() + len(code_to_append) * count  # semi colon position - exclusive
1✔
1232
        tp_code = tp_code[:semi_colon_end] + code_to_append + tp_code[semi_colon_end:]
1✔
1233
        count += 1
1✔
1234
    return tp_code
1✔
1235

1236

1237
def admin_or_project_owner(user, project):
1✔
1238
    if not (user.is_authenticated and (user.admin or user.id in project.owners_ids)):
1✔
1239
        raise abort(403)
1✔
1240

1241

1242
def process_tp_components(tp_code, user_response):
1✔
1243
    """grab the 'pyb-answer' value and use it as a key to retrieve the response
1244
    from user_response(a dict). The response data is then used to set the
1245
    :initial-value' """
1246
    soup = BeautifulSoup(tp_code, 'html.parser')
1✔
1247
    # Disable autosave so that response for different users will be different
1248
    task_presenter_elements = soup.find_all("task-presenter")
1✔
1249
    for task_presenter in task_presenter_elements:
1✔
1250
        remove_element_attributes(task_presenter, "auto-save-seconds")
1✔
1251
        remove_element_attributes(task_presenter, "allow-save-work")
1✔
1252

1253
    for tp_component_tag in TP_COMPONENT_TAGS:
1✔
1254
        tp_components = soup.find_all(tp_component_tag)
1✔
1255
        for tp_component in tp_components:
1✔
1256
            response_key = get_first_existing_data(tp_component, "pyb-answer")
1✔
1257
            response_value = user_response.get(response_key, '')
1✔
1258
            remove_element_attributes(tp_component, "initial-value")
1✔
1259
            tp_component[':initial-value'] = json.dumps(response_value)
1✔
1260
    return soup.prettify()
1✔
1261

1262

1263
def process_table_component(tp_code, user_response, task):
1✔
1264
    """grab the value of 'name' and use it as a key to retrieve the response
1265
    from user_response(a dict). The response data is then used to set the
1266
    'data' or ':data' attribute"""
1267
    table_component_tag = "table-element"
1✔
1268
    soup = BeautifulSoup(tp_code, 'html.parser')
1✔
1269
    table_elements = soup.find_all(table_component_tag)
1✔
1270
    for table_element in table_elements:
1✔
1271
        response_key = table_element.get('name')
1✔
1272
        response_values = user_response.get(response_key, [])
1✔
1273

1274
        # In case the response_values is dict, we need to convert it to a list
1275
        # so that the table-element can display the data correctly
1276
        if type(response_values) is dict:
1✔
1277
            response_values = list(response_values.values())
1✔
1278

1279
        # handle existing data for the response
1280
        initial_data_str = get_first_existing_data(table_element, "data")
1✔
1281
        existing_data = []  # holds reqeust fields or data from :initial_data
1✔
1282
        if initial_data_str:
1✔
1283
            if initial_data_str.startswith("task.info"):  # e.g. task.info.a.b.c
1✔
1284
                attributes = initial_data_str.split(".")
1✔
1285
                request_fields = task.info
1✔
1286
                for attribute in attributes[2:]:  # skip "task" and "info"
1✔
1287
                    request_fields = request_fields.get(attribute, {})
1✔
1288
                if type(request_fields) is not list:
1✔
1289
                    break
1✔
1290
                existing_data = request_fields
1✔
1291
            elif initial_data_str.strip().startswith("["):  # if it is a list
1✔
1292
                try:
1✔
1293
                    pattern = r"task\.info(\.\w+)+"
1✔
1294
                    for m in re.finditer(pattern, initial_data_str):
1✔
1295
                        old_str = m.group(0)
1✔
1296
                        new_str = extract_task_info_data(task, old_str)
1✔
1297
                        initial_data_str = initial_data_str.replace(old_str, new_str)
1✔
1298
                    existing_data = json.loads(initial_data_str)
1✔
1299
                except Exception as e:
1✔
1300
                    current_app.logger.error(
1✔
1301
                        'parsing initial_data_str: {} error: {}'.format(
1302
                            initial_data_str, str(e)))
1303

1304
            # merge existing_data into response_value
1305
            for i in range(min(len(existing_data), len(response_values))):
1✔
1306
                for key in existing_data[i].keys():
1✔
1307
                    # only overwrite when key not existed in response_value
1308
                    if key not in response_values[i]:
1✔
1309
                        response_values[i][key] = existing_data[i][key]
1✔
1310

1311
            # if existing_data is longer: take whatever left in it
1312
            if len(existing_data) > len(response_values):
1✔
1313
                response_values.extend(existing_data[len(response_values):])
1✔
1314

1315
        # Remove attributes like :data, v-bind:data and data before appending
1316
        remove_element_attributes(table_element, "data")
1✔
1317
        table_element[':data'] = json.dumps(response_values)
1✔
1318

1319
        # remove initial-value attribute so that table can display the data
1320
        # append ":initial-value="props.row.COL_NAME" to the element and also
1321
        tag_list = table_element.find_all(
1✔
1322
            lambda tag: "pyb-table-answer" in tag.attrs)
1323
        for t in tag_list:
1✔
1324
            remove_element_attributes(t, "initial-value")
1✔
1325
            column_name = t.get("pyb-table-answer", "")
1✔
1326
            t[":initial-value"] = "props.row." + column_name
1✔
1327
    return soup.prettify()
1✔
1328

1329

1330
def get_first_existing_data(page_element, attribute):
1✔
1331
    """
1332
    Get the first non None data from a serials of attributes from a
1333
    page_element. e.g. if an attribute is "data", a serials of attributes is
1334
    defined as [":data", "v-bind:data", "data"]
1335

1336
    :param page_element: A PageElement.
1337
    :param attribute: an attribute of the page_element.
1338
    :return: the corresponding value of the attribute
1339
    :rtype: string
1340
    """
1341
    attributes = [f":{attribute}", f"v-bind:{attribute}", attribute]
1✔
1342
    values = [page_element.get(attr) for attr in attributes]
1✔
1343
    data = next((v for v in values if v), None)
1✔
1344
    return data
1✔
1345

1346

1347
def remove_element_attributes(page_element, attribute):
1✔
1348
    """
1349
    Remove a serials of attributes from a page_element.
1350
    e.g. if an attribute is "data", a serials of attributes is defined as
1351
    [":data", "v-bind:data", "data"]
1352

1353
    :param page_element: A PageElement.
1354
    :param attribute: an attribute of the page_element.
1355
    :return: None.
1356
    """
1357
    attributes = [f":{attribute}", f"v-bind:{attribute}", attribute]
1✔
1358
    for attr in attributes:
1✔
1359
        if page_element.has_attr(attr):
1✔
1360
            del page_element[attr]
1✔
1361

1362

1363
def extract_task_info_data(task, task_info_str):
1✔
1364
    """
1365
    Extract data from javascript string.
1366
    e.g. task.info.a.b -> task.info.get('a').get('b')
1367

1368
    :param task: task object
1369
    :param task_info_str: javascript task info string
1370
    :return: JSON string
1371
    """
1372
    attributes = task_info_str.split(".")
1✔
1373
    request_fields = task.info
1✔
1374
    for attribute in attributes[2:]:  # skip "task" and "info"
1✔
1375
        request_fields = request_fields.get(attribute, {})
1✔
1376
    return json.dumps(request_fields)
1✔
1377

1378
def get_user_saved_partial_tasks(sentinel, project_id, user_id, task_repo=None):
1✔
1379
    """
1380
    Get the user saved task id list from Redis keys;
1381
    When "task_repo" is passed, it will do a cleanup of keys if tasks
1382
    have been deleted
1383
    """
1384
    pattern = PARTIAL_ANSWER_KEY.format(project_id=project_id,
1✔
1385
                                        user_id=user_id,
1386
                                        task_id="*")
1387
    batch_size = 10000
1✔
1388
    keys = list(sentinel.slave.scan_iter(match=pattern, count=batch_size))
1✔
1389

1390
    result = dict()
1✔
1391
    redis_key_map = dict()
1✔
1392
    for k in keys:
1✔
1393
        try:
1✔
1394
            k = k.decode('utf-8')
1✔
1395
            task_id = int(k.split(':')[-1])
1✔
1396
            ttl = sentinel.slave.ttl(k)
1✔
1397
            result[task_id] = ttl
1✔
1398
            redis_key_map[task_id] = k
1✔
1399
        except Exception as e:
1✔
1400
            error_msg = 'Parsing get_user_saved_partial_tasks error: {}'
1✔
1401
            current_app.logger.error(error_msg.format(str(e)))
1✔
1402

1403
    # Filter existing tasks
1404
    if task_repo:
1✔
1405
        task_ids = task_repo.bulk_query(list(result.keys()), return_only_task_id=True)
1✔
1406
        result = {task_id : ttl for task_id, ttl in result.items() if task_id in task_ids}
1✔
1407
        for task_id, k in redis_key_map.items():
1✔
1408
            if task_id not in task_ids:
1✔
1409
                sentinel.master.delete(k)
1✔
1410

1411
    return result
1✔
1412

1413
def delete_redis_keys(sentinel, pattern):
1✔
1414
    """
1415
    Delete keys in Redis per pattern passed
1416
    """
1417
    batch_size = 10000
1✔
1418
    keys_to_delete = list(sentinel.slave.scan_iter(match=pattern, count=batch_size))
1✔
1419
    if not keys_to_delete:
1✔
1420
        return False
1✔
1421
    return bool(sentinel.master.delete(*keys_to_delete))
1✔
1422

1423

1424
def map_locations(locations):
1✔
1425
    if locations is None:
1✔
1426
        return {
1✔
1427
            'locations': None,
1428
            'country_codes': None,
1429
            'country_names': None
1430
        }
1431

1432
    country_codes_set = set()
1✔
1433
    country_names_set = set()
1✔
1434

1435
    for location in locations:
1✔
1436
        if len(location) == 2:
1✔
1437
            country_codes_set.add(location)
1✔
1438
            mapped_cn = app_settings.upref_mdata.get_country_name_by_country_code(location)
1✔
1439
            if mapped_cn is not None:
1✔
1440
                country_names_set.add(mapped_cn)
1✔
1441
            else:
1442
                current_app.logger.warning(f"Invalid country code '{location}' in map_locations")
1✔
1443
        else:
1444
            country_names_set.add(location)
1✔
1445
            mapped_cc = app_settings.upref_mdata.get_country_code_by_country_name(location)
1✔
1446
            if mapped_cc is not None:
1✔
1447
                country_codes_set.add(mapped_cc)
1✔
1448
            else:
1449
                current_app.logger.warning(f"Invalid country name '{location}' in map_locations")
1✔
1450

1451
    return {
1✔
1452
        'locations': list(country_codes_set.union(country_names_set)),
1453
        'country_codes': list(country_codes_set),
1454
        'country_names': list(country_names_set)
1455
    }
1456

1457

1458
def get_last_name(fullname):
1✔
1459
    """
1460
    Returns the last name from a fullname, ignoring parenthesis and digits. Used for sorting.
1461
    """
1462
    last_name = ''
1✔
1463

1464
    if fullname:
1✔
1465
        # Remove content within parentheses in name: Jane Doe (ai)
1466
        cleaned_name = re.sub(r'\s*\(.*?\)', '', fullname)
1✔
1467
        full_name_parts = cleaned_name.strip().split(' ')
1✔
1468

1469
        # Check if the last part is a number and use the second last if available.
1470
        if full_name_parts[-1].isdigit():
1✔
1471
            last_name = full_name_parts[-2] if len(full_name_parts) > 1 else fullname
1✔
1472
        else:
1473
            last_name = full_name_parts[-1]
1✔
1474

1475
    return last_name
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc