• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

bloomberg / pybossa / 19114618549

05 Nov 2025 07:59PM UTC coverage: 94.093% (+0.03%) from 94.065%
19114618549

Pull #1069

github

peterkle
add more tests
Pull Request #1069: RDISCROWD-8392 upgrade to boto3

214 of 223 new or added lines in 7 files covered. (95.96%)

152 existing lines in 8 files now uncovered.

17920 of 19045 relevant lines covered (94.09%)

0.94 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.03
/pybossa/jobs.py
1
# -*- coding: utf8 -*-
2
# This file is part of PYBOSSA.
3
#
4
# Copyright (C) 2015 Scifabric LTD.
5
#
6
# PYBOSSA is free software: you can redistribute it and/or modify
7
# it under the terms of the GNU Affero General Public License as published by
8
# the Free Software Foundation, either version 3 of the License, or
9
# (at your option) any later version.
10
#
11
# PYBOSSA is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
# GNU Affero General Public License for more details.
15
#
16
# You should have received a copy of the GNU Affero General Public License
17
# along with PYBOSSA.  If not, see <http://www.gnu.org/licenses/>.
18
"""Jobs module for running background tasks in PYBOSSA server."""
1✔
19
import json
1✔
20
import math
1✔
21
import os
1✔
22
import time
1✔
23
from collections import OrderedDict
1✔
24
from datetime import datetime, timedelta
1✔
25
from io import BytesIO
1✔
26
from zipfile import ZipFile
1✔
27

28
import pandas as pd
1✔
29
import requests
1✔
30
from flask import current_app, render_template
1✔
31
from flask_mail import Message, Attachment
1✔
32
from rq.timeouts import JobTimeoutException
1✔
33
from sqlalchemy.sql import text
1✔
34

35
import pybossa.app_settings as app_settings
1✔
36
import pybossa.cache.users as cached_users
1✔
37
import pybossa.dashboard.jobs as dashboard
1✔
38
from pybossa.auditlogger import AuditLogger
1✔
39
from pybossa.cache import site_stats
1✔
40
from pybossa.cache.helpers import n_available_tasks
1✔
41
from pybossa.cache.users import get_users_for_report
1✔
42
from pybossa.cloud_store_api.connection import create_connection
1✔
43
from pybossa.core import mail, task_repo, importer
1✔
44
from pybossa.core import user_repo, auditlog_repo
1✔
45
from pybossa.leaderboard.jobs import leaderboard
1✔
46
from pybossa.model.webhook import Webhook
1✔
47
from pybossa.util import with_cache_disabled, publish_channel, \
1✔
48
    mail_with_enabled_users
49
from pybossa.core import email_service
1✔
50
from pybossa.cloud_store_api.s3 import upload_email_attachment
1✔
51

52
MINUTE = 60
1✔
53
IMPORT_TASKS_TIMEOUT = (20 * MINUTE)
1✔
54
TASK_DELETE_TIMEOUT = (60 * MINUTE)
1✔
55
EXPORT_TASKS_TIMEOUT = (20 * MINUTE)
1✔
56
MAX_RECIPIENTS = 50
1✔
57
BATCH_DELETE_TASK_DELAY = 2 # seconds
1✔
58
BATCH_SIZE_BULK_DELETE_TASKS = 100
1✔
59
MAX_BULK_DELETE_TASK_ITERATIONS = 1000
1✔
60

61
from pybossa.exporter.json_export import JsonExporter
1✔
62

63
auditlogger = AuditLogger(auditlog_repo, caller='web')
1✔
64

65
DUMMY_ENVIRON = {'wsgi.url_scheme': "", 'SERVER_PORT': "", 'SERVER_NAME': "", 'REQUEST_METHOD': ""}
1✔
66

67

68
def schedule_job(function, scheduler):
1✔
69
    """Schedule a job and return a log message."""
70
    scheduled_jobs = scheduler.get_jobs()
1✔
71
    for sj in scheduled_jobs:
1✔
72
        if (function['name'].__name__ in sj.description and
1✔
73
            sj.args == function['args'] and
74
                sj.kwargs == function['kwargs']):
75
            sj.cancel()
1✔
76
            msg = ('WARNING: Job %s(%s, %s) is already scheduled'
1✔
77
                   % (function['name'].__name__, function['args'],
78
                      function['kwargs']))
79
            return msg
1✔
80
    # If job was scheduled, it exists up here, else it continues
81
    scheduler.schedule(
1✔
82
        scheduled_time=(function.get('scheduled_time') or datetime.utcnow()),
83
        func=function['name'],
84
        args=function['args'],
85
        kwargs=function['kwargs'],
86
        interval=function['interval'],
87
        repeat=None,
88
        timeout=function['timeout'])
89

90
    msg = ('Scheduled %s(%s, %s) to run every %s seconds'
1✔
91
           % (function['name'].__name__, function['args'], function['kwargs'],
92
              function['interval']))
93
    return msg
1✔
94

95

96
def get_quarterly_date(now):
1✔
97
    """Get quarterly date."""
98
    if not isinstance(now, datetime):
1✔
99
        raise TypeError('Expected %s, got %s' % (type(datetime), type(now)))
1✔
100
    execute_month = int(math.ceil(now.month / 3.0) * 3)
1✔
101
    execute_day = 31 if execute_month in [3, 12] else 30
1✔
102
    execute_date = datetime(now.year, execute_month, execute_day)
1✔
103
    return datetime.combine(execute_date, now.time())
1✔
104

105

106
def get_saturday_4pm_date(now):
1✔
107
    """Get weekend date Saturday 4pm for weekly execution jobs."""
108
    if not isinstance(now, datetime):
1✔
UNCOV
109
        raise TypeError('Expected %s, got %s' % (type(datetime), type(now)))
×
110
    # Mon - 0, Tue - 1, Wed - 2, Thurs - 3, Fri - 4, Sat - 5, Sun - 6
111
    SATURDAY = 5
1✔
112
    DAYS_IN_WEEK = 7
1✔
113
    offset = (SATURDAY - now.weekday()) % DAYS_IN_WEEK
1✔
114
    saturday = now + timedelta(days=offset)
1✔
115
    saturday = saturday.strftime('%Y-%m-%dT16:00:00')
1✔
116
    saturday = datetime.strptime(saturday, '%Y-%m-%dT%H:%M:%S')
1✔
117
    return saturday
1✔
118

119

120
def enqueue_job(job):
1✔
121
    """Enqueues a job."""
122
    from pybossa.core import sentinel
1✔
123
    from rq import Queue
1✔
124
    redis_conn = sentinel.master
1✔
125
    queue = Queue(job['queue'], connection=redis_conn)
1✔
126
    queue.enqueue_call(func=job['name'],
1✔
127
                       args=job['args'],
128
                       kwargs=job['kwargs'],
129
                       timeout=job['timeout'])
130
    return True
1✔
131

132
def enqueue_periodic_jobs(queue_name):
1✔
133
    """Enqueue all PYBOSSA periodic jobs."""
134
    from pybossa.core import sentinel
1✔
135
    from rq import Queue
1✔
136
    redis_conn = sentinel.master
1✔
137

138
    jobs_generator = get_periodic_jobs(queue_name)
1✔
139
    n_jobs = 0
1✔
140
    queue = Queue(queue_name, connection=redis_conn)
1✔
141
    for job in jobs_generator:
1✔
142
        if (job['queue'] == queue_name):
1✔
143
            n_jobs += 1
1✔
144
            queue.enqueue_call(func=job['name'],
1✔
145
                               args=job['args'],
146
                               kwargs=job['kwargs'],
147
                               timeout=job['timeout'])
148
    msg = "%s jobs in %s have been enqueued" % (n_jobs, queue_name)
1✔
149
    return msg
1✔
150

151

152
def get_periodic_jobs(queue):
1✔
153
    """Return a list of periodic jobs for a given queue."""
154
    # A job is a dict with the following format: dict(name, args, kwargs,
155
    # timeout, queue)
156
    # Default ones
157
    jobs = get_default_jobs()
1✔
158
    # Admin jobs
159
    admin_report_jobs = get_weekly_admin_report_jobs() if queue == 'low' else []
1✔
160
    # Based on type of user
161
    project_jobs = get_project_jobs(queue) if queue in ('super', 'high') else []
1✔
162
    autoimport_jobs = get_autoimport_jobs() if queue == 'low' else []
1✔
163
    # User engagement jobs
164
    engage_jobs = get_inactive_users_jobs() if queue == 'quaterly' else []
1✔
165
    non_contrib_jobs = get_non_contributors_users_jobs() \
1✔
166
        if queue == 'quaterly' else []
167
    dashboard_jobs = get_dashboard_jobs() if queue == 'low' else []
1✔
168
    leaderboard_jobs = get_leaderboard_jobs() if queue == 'super' else []
1✔
169
    weekly_update_jobs = get_weekly_stats_update_projects() if queue == 'low' else []
1✔
170
    failed_jobs = get_maintenance_jobs() if queue == 'maintenance' else []
1✔
171
    # completed_tasks_cleanup_job = get_completed_tasks_cleaup_jobs() if queue == 'weekly' else [] # TODO: uncomment in future PR
172
    _all = [jobs, admin_report_jobs, project_jobs, autoimport_jobs,
1✔
173
            engage_jobs, non_contrib_jobs, dashboard_jobs,
174
            weekly_update_jobs, failed_jobs, leaderboard_jobs]
175
    return (job for sublist in _all for job in sublist if job['queue'] == queue)
1✔
176

177

178
def get_default_jobs():  # pragma: no cover
179
    """Return default jobs."""
180
    timeout = current_app.config.get('TIMEOUT')
181
    unpublish_projects = current_app.config.get('UNPUBLISH_PROJECTS')
182
    yield dict(name=warm_up_stats, args=[], kwargs={},
183
               timeout=timeout, queue='high')
184
    if unpublish_projects:
185
        yield dict(name=warn_old_project_owners, args=[], kwargs={},
186
                   timeout=timeout, queue='low')
187
    yield dict(name=warm_cache, args=[], kwargs={},
188
               timeout=2*timeout, queue='super')
189
    yield dict(name=news, args=[], kwargs={},
190
               timeout=timeout, queue='low')
191
    yield dict(name=disable_users_job, args=[],kwargs={},
192
               timeout=timeout, queue='low')
193
    yield dict(name=send_email_notifications, args=[], kwargs={},
194
               timeout=timeout, queue='super')
195

196

197
def get_maintenance_jobs():
1✔
198
    """Return mantainance jobs."""
199
    timeout = current_app.config.get('TIMEOUT')
1✔
200
    yield dict(name=check_failed, args=[], kwargs={},
1✔
201
               timeout=timeout, queue='maintenance')
202

203

204
def get_export_task_jobs(queue):
1✔
205
    """Export tasks to zip."""
206
    from pybossa.core import project_repo
1✔
207
    import pybossa.cache.projects as cached_projects
1✔
208
    from pybossa.pro_features import ProFeatureHandler
1✔
209
    feature_handler = ProFeatureHandler(current_app.config.get('PRO_FEATURES'))
1✔
210
    timeout = current_app.config.get('TIMEOUT')
1✔
211
    if feature_handler.only_for_pro('updated_exports'):
1✔
212
        if queue == 'high':
1✔
213
            projects = cached_projects.get_from_pro_user()
1✔
214
        else:
215
            projects = (p.dictize() for p in project_repo.filter_by(published=True)
1✔
216
                        if p.owner.pro is False)
217
    else:
218
        projects = (p.dictize() for p in project_repo.filter_by(published=True))
1✔
219
    for project in projects:
1✔
220
        project_id = project.get('id')
1✔
221
        job = dict(name=project_export,
1✔
222
                   args=[project_id], kwargs={},
223
                   timeout=timeout,
224
                   queue=queue)
225
        yield job
1✔
226

227

228
def project_export(_id):
1✔
229
    """Export project."""
230
    from pybossa.core import project_repo, json_exporter, csv_exporter
1✔
231
    app = project_repo.get(_id)
1✔
232
    if app is not None:
1✔
233
        # print "Export project id %d" % _id
234
        json_exporter.pregenerate_zip_files(app)
1✔
235
        csv_exporter.pregenerate_zip_files(app)
1✔
236

237

238
def get_project_jobs(queue):
1✔
239
    """Return a list of jobs based on user type."""
240
    from pybossa.cache import projects as cached_projects
1✔
241
    timeout = current_app.config.get('TIMEOUT')
1✔
242
    if queue == 'super':
1✔
243
        projects = cached_projects.get_from_pro_user()
1✔
244
    elif queue == 'high':
1✔
245
        projects = cached_projects.get_recently_updated_projects()
1✔
246
    else:
UNCOV
247
        projects = []
×
248
    for project in projects:
1✔
249
        project_id = project.get('id')
1✔
250
        project_short_name = project.get('short_name')
1✔
251
        job = dict(name=get_project_stats,
1✔
252
                   args=[project_id, project_short_name], kwargs={},
253
                   timeout=timeout,
254
                   queue=queue)
255
        yield job
1✔
256

257

258
def create_dict_jobs(data, function, timeout, queue='low'):
1✔
259
    """Create a dict job."""
260
    for d in data:
1✔
261
        jobs = dict(name=function,
1✔
262
                    args=[d['id'], d['short_name']], kwargs={},
263
                    timeout=timeout,
264
                    queue=queue)
265
        yield jobs
1✔
266

267

268
def get_inactive_users_jobs(queue='quaterly'):
1✔
269
    """Return a list of inactive users that have contributed to a project."""
270
    from sqlalchemy.sql import text
1✔
271
    from pybossa.model.user import User
1✔
272
    from pybossa.core import db
1✔
273
    # First users that have participated once but more than 3 months ago
274
    sql = text('''SELECT user_id FROM task_run
1✔
275
               WHERE user_id IS NOT NULL
276
               AND to_date(task_run.finish_time, 'YYYY-MM-DD\THH24:MI:SS.US')
277
               >= NOW() - '12 month'::INTERVAL
278
               AND to_date(task_run.finish_time, 'YYYY-MM-DD\THH24:MI:SS.US')
279
               < NOW() - '3 month'::INTERVAL
280
               GROUP BY user_id ORDER BY user_id;''')
281
    results = db.slave_session.execute(sql)
1✔
282

283
    timeout = current_app.config.get('TIMEOUT')
1✔
284

285
    for row in results:
1✔
286

287
        user = User.query.get(row.user_id)
1✔
288

289
        if user.subscribed and user.restrict is False:
1✔
290
            subject = "We miss you!"
1✔
291
            body = render_template('/account/email/inactive.md',
1✔
292
                                   user=user.dictize(),
293
                                   config=current_app.config)
294
            html = render_template('/account/email/inactive.html',
1✔
295
                                   user=user.dictize(),
296
                                   config=current_app.config)
297

298
            mail_dict = dict(recipients=[user.email_addr],
1✔
299
                             subject=subject,
300
                             body=body,
301
                             html=html)
302

303
            job = dict(name=send_mail,
1✔
304
                       args=[mail_dict],
305
                       kwargs={},
306
                       timeout=timeout,
307
                       queue=queue)
308
            yield job
1✔
309

310

311
def get_dashboard_jobs(queue='low'):  # pragma: no cover
312
    """Return dashboard jobs."""
313
    timeout = current_app.config.get('TIMEOUT')
314
    yield dict(name=dashboard.active_users_week, args=[], kwargs={},
315
               timeout=timeout, queue=queue)
316
    yield dict(name=dashboard.active_anon_week, args=[], kwargs={},
317
               timeout=timeout, queue=queue)
318
    yield dict(name=dashboard.draft_projects_week, args=[], kwargs={},
319
               timeout=timeout, queue=queue)
320
    yield dict(name=dashboard.published_projects_week, args=[], kwargs={},
321
               timeout=timeout, queue=queue)
322
    yield dict(name=dashboard.update_projects_week, args=[], kwargs={},
323
               timeout=timeout, queue=queue)
324
    yield dict(name=dashboard.new_tasks_week, args=[], kwargs={},
325
               timeout=timeout, queue=queue)
326
    yield dict(name=dashboard.new_task_runs_week, args=[], kwargs={},
327
               timeout=timeout, queue=queue)
328
    yield dict(name=dashboard.new_users_week, args=[], kwargs={},
329
               timeout=timeout, queue=queue)
330
    yield dict(name=dashboard.returning_users_week, args=[], kwargs={},
331
               timeout=timeout, queue=queue)
332

333

334
def get_leaderboard_jobs(queue='super'):  # pragma: no cover
335
    """Return leaderboard jobs."""
336
    timeout = current_app.config.get('TIMEOUT')
337
    leaderboards = current_app.config.get('LEADERBOARDS')
338
    if leaderboards:
339
        for leaderboard_key in leaderboards:
340
            yield dict(name=leaderboard, args=[], kwargs={'info': leaderboard_key},
341
                       timeout=timeout, queue=queue)
342
    yield dict(name=leaderboard, args=[], kwargs={},
343
               timeout=timeout, queue=queue)
344

345

346
def get_non_contributors_users_jobs(queue='quaterly'):
1✔
347
    """Return a list of users that have never contributed to a project."""
348
    from sqlalchemy.sql import text
1✔
349
    from pybossa.model.user import User
1✔
350
    from pybossa.core import db
1✔
351
    # Second users that have created an account but never participated
352
    sql = text('''SELECT id FROM "user" WHERE
1✔
353
               NOT EXISTS (SELECT user_id FROM task_run
354
               WHERE task_run.user_id="user".id)''')
355
    results = db.slave_session.execute(sql)
1✔
356
    timeout = current_app.config.get('TIMEOUT')
1✔
357
    for row in results:
1✔
358
        user = User.query.get(row.id)
1✔
359

360
        if (user.subscribed and user.restrict is False):
1✔
361
            subject = "Why don't you help us?!"
1✔
362
            body = render_template('/account/email/noncontributors.md',
1✔
363
                                   user=user.dictize(),
364
                                   config=current_app.config)
365
            html = render_template('/account/email/noncontributors.html',
1✔
366
                                   user=user.dictize(),
367
                                   config=current_app.config)
368
            mail_dict = dict(recipients=[user.email_addr],
1✔
369
                             subject=subject,
370
                             body=body,
371
                             html=html)
372

373
            job = dict(name=send_mail,
1✔
374
                       args=[mail_dict],
375
                       kwargs={},
376
                       timeout=timeout,
377
                       queue=queue)
378
            yield job
1✔
379

380

381
def get_autoimport_jobs(queue='low'):
1✔
382
    """Get autoimport jobs."""
383
    from pybossa.core import project_repo
1✔
384
    import pybossa.cache.projects as cached_projects
1✔
385
    from pybossa.pro_features import ProFeatureHandler
1✔
386
    feature_handler = ProFeatureHandler(current_app.config.get('PRO_FEATURES'))
1✔
387

388
    timeout = current_app.config.get('TIMEOUT')
1✔
389

390
    if feature_handler.only_for_pro('autoimporter'):
1✔
391
        projects = cached_projects.get_from_pro_user()
1✔
392
    else:
393
        projects = (p.dictize() for p in project_repo.get_all())
1✔
394
    for project_dict in projects:
1✔
395
        project = project_repo.get(project_dict['id'])
1✔
396
        if project.has_autoimporter():
1✔
397
            job = dict(name=import_tasks,
1✔
398
                       args=[project.id, True],
399
                       kwargs=project.get_autoimporter(),
400
                       timeout=timeout,
401
                       queue=queue)
402
            yield job
1✔
403

404

405
# The following are the actual jobs (i.e. tasks performed in the background)
406

407
@with_cache_disabled
1✔
408
def get_project_stats(_id, short_name):  # pragma: no cover
409
    """Get stats for project."""
410
    with current_app.request_context(DUMMY_ENVIRON):
411
        import pybossa.cache.project_stats as stats
412

413
        # cached_projects.get_project(short_name)
414
        stats.update_stats(_id)
415

416

417
@with_cache_disabled
1✔
418
def warm_up_stats():  # pragma: no cover
419
    """Background job for warming stats."""
420
    # print "Running on the background warm_up_stats"
421
    from pybossa.cache.site_stats import (n_auth_users, n_anon_users,
422
                                          n_tasks_site, n_total_tasks_site,
423
                                          n_task_runs_site,
424
                                          get_top5_projects_24_hours,
425
                                          get_top5_users_24_hours)
426
    current_app.logger.info('warm_up_stats - n_auth_users')
427
    n_auth_users()
428
    if not app_settings.config.get('DISABLE_ANONYMOUS_ACCESS'):
429
        n_anon_users()
430
    current_app.logger.info('warm_up_stats - n_tasks_site')
431
    n_tasks_site()
432
    current_app.logger.info('warm_up_stats - n_total_tasks_site')
433
    n_total_tasks_site()
434
    current_app.logger.info('warm_up_stats - n_task_runs_site')
435
    n_task_runs_site()
436
    current_app.logger.info('warm_up_stats - get_top5_projects_24_hours')
437
    get_top5_projects_24_hours()
438
    current_app.logger.info('warm_up_stats - get_top5_users_24_hours')
439
    get_top5_users_24_hours()
440

441
    return True
442

443

444
@with_cache_disabled
1✔
445
def warm_cache():  # pragma: no cover
446
    """Background job to warm cache."""
447
    from pybossa.core import create_app
448
    app = create_app(run_as_server=False)
449
    projects_cached = []
450

451
    with app.request_context(DUMMY_ENVIRON):
452
        import pybossa.cache.projects as cached_projects
453
        import pybossa.cache.categories as cached_cat
454
        from pybossa.util import rank
455
        from pybossa.core import user_repo
456

457
        def warm_project(_id, short_name, featured=False):
458
            if _id not in projects_cached:
459
                # stats.update_stats(_id)  # duplicate work of get_project_jobs
460
                projects_cached.append(_id)
461

462
        start = time.time()
463
        # Cache top projects
464
        projects = cached_projects.get_top()
465
        for p in projects:
466
            current_app.logger.info('warm_project - top projects. id {} short_name: {}'
467
                .format(p['id'], p['short_name']))
468
            warm_project(p['id'], p['short_name'])
469

470
        # Cache 3 pages
471
        to_cache = 3 * app.config['APPS_PER_PAGE']
472
        projects = rank(cached_projects.get_all_featured('featured'))[:to_cache]
473
        for p in projects:
474
            current_app.logger.info('warm_project - ranked project. id {} short_name{}'
475
                .format(p['id'], p['short_name']))
476
            warm_project(p['id'], p['short_name'], featured=True)
477

478
        # Categories
479
        categories = cached_cat.get_used()
480
        for c in categories:
481
            projects = rank(cached_projects.get_all(c['short_name']))[:to_cache]
482
            for p in projects:
483
                current_app.logger.info('warm_project - categories->rank project. id {} short_name{}'
484
                    .format(p['id'], p['short_name']))
485
                warm_project(p['id'], p['short_name'])
486

487
        current_app.logger.info(f'warm_project - completed {len(projects_cached)} projects in {time.time() - start} seconds')
488

489
        # Users
490
        users = cached_users.get_leaderboard(app.config['LEADERBOARD'])
491
        current_app.logger.info(f'warm_project - get_leaderboard for {len(users)} users')
492
        for user in users:
493
            u = user_repo.get_by_name(user['name'])
494
            cached_users.get_user_summary(user['name'])
495
            current_app.logger.info(
496
                f"warm_project - user get_user_summary: name { user['name']} "
497
                f"in {time.time() - start} seconds")
498

499
            cached_users.projects_contributed_cached(u.id)
500
            current_app.logger.info(
501
                f"warm_project - user projects_contributed_cached: id {u.id} "
502
                f"in {time.time() - start} seconds")
503

504
            cached_users.published_projects_cached(u.id)
505
            current_app.logger.info(
506
                f"warm_project - user published_projects_cached: id {u.id} "
507
                f"in {time.time() - start} seconds")
508

509
            cached_users.draft_projects_cached(u.id)
510
            current_app.logger.info(
511
                f"warm_project - user draft_projects_cached: id {u.id} "
512
                f"in {time.time() - start} seconds")
513

514
        return True
515

516

517
def get_non_updated_projects():
1✔
518
    """Return a list of non updated projects excluding completed ones."""
519
    from sqlalchemy.sql import text
1✔
520
    from pybossa.model.project import Project
1✔
521
    from pybossa.core import db
1✔
522
    sql = text('''SELECT id FROM project WHERE TO_DATE(updated,
1✔
523
                'YYYY-MM-DD\THH24:MI:SS.US') <= NOW() - '3 month':: INTERVAL
524
               AND contacted != True AND published = True
525
               AND project.id NOT IN
526
               (SELECT task.project_id FROM task
527
               WHERE task.state='completed'
528
               GROUP BY task.project_id)''')
529
    results = db.slave_session.execute(sql)
1✔
530
    projects = []
1✔
531
    for row in results:
1✔
532
        a = Project.query.get(row.id)
1✔
533
        projects.append(a)
1✔
534
    return projects
1✔
535

536

537
def warn_old_project_owners():
1✔
538
    """E-mail the project owners not updated in the last 3 months."""
539
    from smtplib import SMTPRecipientsRefused
1✔
540
    from pybossa.core import mail, project_repo
1✔
541
    from pybossa.cache.projects import clean
1✔
542
    from flask_mail import Message
1✔
543

544
    projects = get_non_updated_projects()
1✔
545

546
    with mail.connect() as conn:
1✔
547
        for project in projects:
1✔
548
            if (project.owner.consent and project.owner.subscribed):
1✔
549
                subject = ('Your %s project: %s has been inactive'
1✔
550
                           % (current_app.config.get('BRAND'), project.name))
551
                body = render_template('/account/email/inactive_project.md',
1✔
552
                                       project=project)
553
                html = render_template('/account/email/inactive_project.html',
1✔
554
                                       project=project)
555
                msg = Message(recipients=[project.owner.email_addr],
1✔
556
                              subject=subject,
557
                              body=body,
558
                              html=html)
559
                try:
1✔
560
                    conn.send(msg)
1✔
561
                    project.contacted = True
1✔
562
                    project.published = False
1✔
563
                    clean(project.id)
1✔
564
                    project_repo.update(project)
1✔
565
                except SMTPRecipientsRefused:
1✔
566
                    return False
1✔
567
            else:
568
                return False
1✔
569
    return True
1✔
570

571

572
def disable_users_job():
1✔
573
    from sqlalchemy.sql import text
1✔
574
    from pybossa.model.user import User
1✔
575
    from pybossa.core import db, user_repo
1✔
576

577
    # default user deactivation time
578
    user_interval = current_app.config.get('STALE_USERS_MONTHS') or 3
1✔
579
    # domains that are in extended users category
580
    ext_user_domains = current_app.config.get('EXTENDED_STALE_USERS_DOMAINS') or []
1✔
581

582
    if ext_user_domains:
1✔
583
        # never disable extended users
584
        ext_users_filter = ' OR '.join('(u.email_addr LIKE \'%{}\')'.format(domain) for domain in ext_user_domains)
1✔
585
        where = '''((u.inactivity > interval '{} month') AND NOT ({}))'''.format(user_interval, ext_users_filter)
1✔
586
    else:
587
        where = 'u.inactivity > interval \'{} month\''.format(user_interval)
1✔
588

589
    sql = text('''
1✔
590
        SELECT id FROM (
591
            SELECT id, enabled, email_addr, (current_timestamp - to_timestamp(last_login, 'YYYY-MM-DD"T"HH24:MI:SS.US')) AS inactivity
592
            FROM "user"
593
        ) u
594
        WHERE ({}) AND u.enabled = true;
595
    '''.format(where))
596
    results = db.slave_session.execute(sql)
1✔
597
    users_disabled = []
1✔
598

599
    for row in results:
1✔
600
        user = User.query.get(row.id)
1✔
601
        user.enabled = False
1✔
602
        user_repo.update(user)
1✔
603
        user_info = 'name: {}, id: {}, email: {}, last_login: {}'.format(
1✔
604
                        user.name, user.id, user.email_addr, user.last_login)
605
        users_disabled.append(user_info)
1✔
606

607
    if users_disabled:
1✔
608
        current_app.logger.info('disable_users_job has disabled following {} users\n{}'
1✔
609
            .format(len(users_disabled), ', '.join(users_disabled)))
610
    return True
1✔
611

612

613
def send_mail(message_dict, mail_all=False):
1✔
614
    """Send email."""
615

616
    if mail_all or mail_with_enabled_users(message_dict):
1✔
617
        message = Message(**message_dict)
1✔
618
        spam = False
1✔
619
        for r in message_dict['recipients']:
1✔
620
            acc, domain = r.split('@')
1✔
621
            if domain in current_app.config.get('SPAM', []):
1✔
622
                spam = True
1✔
623
                break
1✔
624
        if not spam:
1✔
625
            if email_service.enabled:
1✔
626
                # Normalize email aliases in recipients
627
                recipients = []
1✔
628
                for r in message_dict.get("recipients", []):
1✔
629
                    if "+" in r:
1✔
630
                        local, domain = r.split("@", 1)
1✔
631
                        local = local.split("+", 1)[0]
1✔
632
                        r = f"{local}@{domain}"
1✔
633
                    recipients.append(r)
1✔
634
                # Remove duplicates
635
                recipients = list(dict.fromkeys(recipients))
1✔
636
                message_dict["recipients"] = recipients
1✔
637
                current_app.logger.info("Send email calling email_service %s", message_dict)
1✔
638
                email_service.send(message_dict)
1✔
639
            else:
640
                current_app.logger.info("Send email calling flask.mail %s", message_dict)
1✔
641
                mail.send(message)
1✔
642

643

644
def count_records(table, task_ids):
1✔
645
    from pybossa.core import db
1✔
646

647
    task_ids_tuple = tuple(task_ids)
1✔
648
    sql = f"SELECT COUNT(*) FROM {table} WHERE task_id IN :taskids;"
1✔
649
    response = db.session.execute(sql, {"taskids": task_ids_tuple}).scalar()
1✔
650
    return response
1✔
651

652

653
def count_rows_to_delete(task_ids):
1✔
654
    total_task = len(task_ids)
1✔
655
    total_taskrun = count_records("task_run", task_ids)
1✔
656
    total_result = count_records("result", task_ids)
1✔
657
    return total_task, total_taskrun, total_result
1✔
658

659

660
def cleanup_task_records(task_ids, force_reset):
1✔
661
    """Cleanup records associated with task from all related tables."""
662

663
    from pybossa.core import db
1✔
664
    from pybossa.cache.task_browse_helpers import get_task_filters
1✔
665

666
    tables = ["result", "task_run", "task"] if force_reset else ["task"]
1✔
667
    current_app.logger.info("Task ids staged for deletion: %s", task_ids)
1✔
668
    task_ids_tuple = tuple(task_ids)
1✔
669
    for table in tables:
1✔
670
        sql = f"DELETE FROM {table} "
1✔
671
        sql += "WHERE id IN :taskids;" if table == "task" else "WHERE task_id IN :taskids;"
1✔
672
        db.session.execute(sql, {"taskids": task_ids_tuple})
1✔
673
        db.session.commit()
1✔
674

675
    current_app.logger.info("Total %d tasks deleted from db tables %s", len(task_ids), tables)
1✔
676

677
def get_tasks_to_delete(project_id, task_filter_args):
1✔
678
    from pybossa.core import db
1✔
679
    from pybossa.cache.task_browse_helpers import get_task_filters
1✔
680

681
    conditions, params = get_task_filters(task_filter_args)
1✔
682
    sql = text('''
1✔
683
            SELECT task.id as id,
684
                coalesce(ct, 0) as n_task_runs, task.n_answers, ft,
685
                priority_0, task.created
686
                FROM task LEFT OUTER JOIN
687
                (SELECT task_id, CAST(COUNT(id) AS FLOAT) AS ct,
688
                MAX(finish_time) as ft FROM task_run
689
                WHERE project_id=:project_id GROUP BY task_id) AS log_counts
690
                ON task.id=log_counts.task_id
691
                WHERE task.project_id=:project_id {}
692
                ORDER BY task.id;
693
            '''.format(conditions))
694
    response = db.bulkdel_session.execute(sql, dict(project_id=project_id, **params)).fetchall()
1✔
695
    task_ids = [id[0] for id in response]
1✔
696
    return task_ids
1✔
697

698

699
def delete_bulk_tasks_in_batches(project_id, force_reset, task_filter_args):
1✔
700
    """Delete bulk tasks in batches from project."""
701

702
    current_app.logger.info("Deleting tasks in batches for project %d", project_id)
1✔
703
    batch_size = BATCH_SIZE_BULK_DELETE_TASKS
1✔
704
    task_ids = get_tasks_to_delete(project_id, task_filter_args)
1✔
705
    total_task, total_taskrun, total_result = count_rows_to_delete(task_ids)
1✔
706
    current_app.logger.info("total records to delete. task %d, task_run %d, result %d",
1✔
707
                            total_task, total_taskrun, total_result)
708

709
    # count iterations required to delete records from all tables
710
    result_iterations = int(total_result / batch_size) + (1 if total_result % batch_size else 0)
1✔
711
    taskrun_iterations = int(total_taskrun / batch_size) + (1 if total_taskrun % batch_size else 0)
1✔
712
    task_iterations = int(total_task / batch_size) + (1 if total_task % batch_size else 0)
1✔
713
    current_app.logger.info("total iterations. task %d, task_run %d, result %d",
1✔
714
                            task_iterations, taskrun_iterations, result_iterations)
715

716
    limit = batch_size
1✔
717
    total_iterations = max(result_iterations, taskrun_iterations, task_iterations)
1✔
718
    total_iterations = min(total_iterations, MAX_BULK_DELETE_TASK_ITERATIONS)
1✔
719
    for i in range(total_iterations):
1✔
720
        start_position = i * batch_size
1✔
721
        end_position = start_position + batch_size
1✔
722
        batched_task_ids = task_ids[start_position:end_position]
1✔
723
        cleanup_task_records(batched_task_ids, force_reset)
1✔
724
        time.sleep(BATCH_DELETE_TASK_DELAY) # allow sql queries other than delete records to execute
1✔
725
    current_app.logger.info("Completed deleting tasks in batches for project %d", project_id)
1✔
726

727

728
def delete_bulk_tasks_with_session_repl(project_id, force_reset, task_filter_args):
1✔
729
    from pybossa.core import db
1✔
730
    from pybossa.cache.task_browse_helpers import get_task_filters
1✔
731

732
    # bulkdel db conn is with db user having session_replication_role
733
    # when bulkdel is not configured, make explict sql query to set
734
    # session replication role to replica
735
    sql_session_repl = ''
1✔
736
    if not 'bulkdel' in current_app.config.get('SQLALCHEMY_BINDS'):
1✔
737
        sql_session_repl = 'SET session_replication_role TO replica;'
1✔
738

739
    # lock tasks for given project with SELECT FOR UPDATE
740
    # create temp table with all tasks to be deleted
741
    # during transaction, disable constraints check with session_replication_role
742
    # delete rows from child talbes first and then from parent
743
    if not force_reset:
1✔
744
        """Delete only tasks that have no results associated."""
UNCOV
745
        params = {}
×
746
        sql = text('''
×
747
                BEGIN;
748
                SELECT task_id FROM task_run WHERE project_id=:project_id FOR UPDATE;
749
                SELECT id FROM task WHERE project_id=:project_id FOR UPDATE;
750

751
                {}
752

753
                CREATE TEMP TABLE to_delete ON COMMIT DROP AS (
754
                    SELECT task.id as id FROM task WHERE project_id=:project_id
755
                    AND task.id NOT IN
756
                    (SELECT task_id FROM result
757
                    WHERE result.project_id=:project_id
758
                    GROUP BY result.task_id)
759
                );
760

761
                DELETE FROM task_run WHERE project_id=:project_id
762
                        AND task_id IN (SELECT id FROM to_delete);
763
                DELETE FROM task WHERE project_id=:project_id
764
                        AND id IN (SELECT id FROM to_delete);
765

766
                COMMIT;
767
                '''.format(sql_session_repl))
768
    else:
769
        conditions, params = get_task_filters(task_filter_args)
1✔
770
        sql = text('''
1✔
771
                BEGIN;
772
                SELECT task_id FROM result WHERE project_id=:project_id FOR UPDATE;
773
                SELECT task_id FROM task_run WHERE project_id=:project_id FOR UPDATE;
774
                SELECT id FROM task WHERE project_id=:project_id FOR UPDATE;
775

776
                {}
777

778
                CREATE TEMP TABLE to_delete ON COMMIT DROP AS (
779
                    SELECT task.id as id,
780
                    coalesce(ct, 0) as n_task_runs, task.n_answers, ft,
781
                    priority_0, task.created
782
                    FROM task LEFT OUTER JOIN
783
                    (SELECT task_id, CAST(COUNT(id) AS FLOAT) AS ct,
784
                    MAX(finish_time) as ft FROM task_run
785
                    WHERE project_id=:project_id GROUP BY task_id) AS log_counts
786
                    ON task.id=log_counts.task_id
787
                    WHERE task.project_id=:project_id {}
788
                );
789

790
                DELETE FROM result WHERE project_id=:project_id
791
                       AND task_id in (SELECT id FROM to_delete);
792
                DELETE FROM task_run WHERE project_id=:project_id
793
                       AND task_id in (SELECT id FROM to_delete);
794
                DELETE FROM task WHERE task.project_id=:project_id
795
                       AND id in (SELECT id FROM to_delete);
796

797
                COMMIT;
798
                '''.format(sql_session_repl, conditions))
799
    db.bulkdel_session.execute(sql, dict(project_id=project_id, **params))
1✔
800

801
def delete_bulk_tasks(data):
1✔
802
    """Delete tasks in bulk from project."""
803
    import pybossa.cache.projects as cached_projects
1✔
804

805

806
    project_id = data['project_id']
1✔
807
    project_name = data['project_name']
1✔
808
    curr_user = data['curr_user']
1✔
809
    coowners = data['coowners']
1✔
810
    current_user_fullname = data['current_user_fullname']
1✔
811
    force_reset = data['force_reset']
1✔
812
    url = data['url']
1✔
813

814
    task_filter_args = data.get('filters', {})
1✔
815
    if (current_app.config.get("SESSION_REPLICATION_ROLE_DISABLED")):
1✔
816
        delete_bulk_tasks_in_batches(project_id, force_reset, task_filter_args)
1✔
817
    else:
818
        delete_bulk_tasks_with_session_repl(project_id, force_reset, task_filter_args)
1✔
819

820
    cached_projects.clean_project(project_id)
1✔
821
    if not force_reset:
1✔
UNCOV
822
        msg = ("Tasks and taskruns with no associated results have been "
×
823
            "deleted from project {0} by {1}"
824
            .format(project_name, current_user_fullname))
825
    else:
826
        msg = ("Tasks, taskruns and results associated have been "
1✔
827
               "deleted from project {0} on {1} as requested by {2}"
828
               .format(project_name, url, current_user_fullname))
829

830
    subject = 'Tasks deletion from %s' % project_name
1✔
831
    body = 'Hello,\n\n' + msg + '\n\nThe %s team.'\
1✔
832
        % current_app.config.get('BRAND')
833

834
    recipients = [curr_user]
1✔
835
    for user in coowners:
1✔
UNCOV
836
        recipients.append(user.email_addr)
×
837

838
    mail_dict = dict(recipients=recipients, subject=subject, body=body)
1✔
839
    send_mail(mail_dict)
1✔
840
    check_and_send_task_notifications(project_id)
1✔
841

842

843
def send_email_notifications():
1✔
844
    from pybossa.core import sentinel
1✔
845
    from pybossa.cache import projects as cached_projects
1✔
846
    from pybossa.core import project_repo
1✔
847
    from pybossa.sched import Schedulers
1✔
848

849
    redis_conn = sentinel.master
1✔
850
    project_set = redis_conn.hgetall('updated_project_ids') or {}
1✔
851
    for project_id, timestamp in project_set.items():
1✔
852
        # data from Redis client in Python3 returns bytes
853
        project_id = project_id.decode()
1✔
854
        timestamp = timestamp.decode()
1✔
855

856
        project = project_repo.get(project_id)
1✔
857
        redis_conn.hdel('updated_project_ids', project_id)
1✔
858
        if not project.email_notif:
1✔
859
            continue
1✔
860
        user_emails = []
1✔
861
        if cached_projects.get_project_scheduler(project_id) in [Schedulers.user_pref, Schedulers.task_queue]:
1✔
862
            user_emails = user_repo.get_user_pref_recent_contributor_emails(project_id, timestamp)
1✔
863
        else:
UNCOV
864
            if cached_projects.overall_progress(project_id) != 100:
×
865
                user_emails = user_repo.get_recent_contributor_emails(project_id)
×
866

867
        if user_emails:
1✔
868
            recipients = []
1✔
869
            for email_addr in user_emails:
1✔
870
                if email_addr not in recipients:
1✔
871
                    recipients.append(email_addr)
1✔
872
            subject = ('New Tasks have been imported to {}'.format(project.name))
1✔
873
            body = 'Hello,\n\nThere have been new tasks uploaded to the previously finished project, {0}. ' \
1✔
874
                   '\nLog on to {1} to complete any available tasks.' \
875
                .format(project.name, current_app.config.get('BRAND'))
876
            recipients_chunk = [recipients[x : x + MAX_RECIPIENTS]
1✔
877
                                for x in range(0, len(recipients), MAX_RECIPIENTS)]
878
            for group in recipients_chunk:
1✔
879
                mail_dict = dict(recipients=group, subject=subject, body=body)
1✔
880
                send_mail(mail_dict)
1✔
881
    return True
1✔
882

883

884
def _num_tasks_imported(project_id):
1✔
885
    from sqlalchemy.sql import text
1✔
886
    from pybossa.core import db
1✔
887
    sql = text('''
1✔
888
        SELECT COUNT(*) FROM task WHERE
889
        project_id=:project_id AND
890
            clock_timestamp()
891
                - to_timestamp(task.created, 'YYYY-MM-DD"T"HH24:MI:SS.US')
892
            < INTERVAL ':seconds seconds'
893
        ''')
894
    params = dict(seconds=IMPORT_TASKS_TIMEOUT + 10, project_id=project_id)
1✔
895
    return db.session.execute(sql, params).scalar()
1✔
896

897

898
def import_tasks(project_id, current_user_fullname, from_auto=False, **form_data):
1✔
899
    """Import tasks for a project."""
900
    from pybossa.core import project_repo, user_repo
1✔
901
    import pybossa.cache.projects as cached_projects
1✔
902

903
    current_app.logger.info("Importing tasks for project %d", project_id)
1✔
904
    project = project_repo.get(project_id)
1✔
905
    recipients = []
1✔
906
    for user in user_repo.get_users(project.owners_ids):
1✔
907
        recipients.append(user.email_addr)
1✔
908

909
    try:
1✔
910
        with current_app.test_request_context():
1✔
911
            report = importer.create_tasks(task_repo, project, **form_data)
1✔
912
    except JobTimeoutException:
1✔
913
        from pybossa.core import db
1✔
914
        db.session.rollback()
1✔
915
        n_tasks = _num_tasks_imported(project_id)
1✔
916
        subject = 'Your import task has timed out'
1✔
917
        body = '\n'.join(
1✔
918
            ['Hello,\n',
919
             'Import task to your project {} by {} failed because the file was too large.',
920
             'It was able to process approximately {} tasks.',
921
             'Please break up your task upload into smaller CSV files.',
922
             'Thank you,\n',
923
             'The {} team.']).format(project.name, current_user_fullname,
924
                                     n_tasks, current_app.config.get('BRAND'))
925
        mail_dict = dict(recipients=recipients, subject=subject, body=body)
1✔
926
        send_mail(mail_dict)
1✔
927
        raise
1✔
928
    except Exception as e:
1✔
929
        msg = ('Import tasks to your project {} by {} failed. Error: {}'
1✔
930
               .format(project.name, current_user_fullname, str(e)))
931
        subject = 'Tasks Import to your project %s' % project.name
1✔
932
        body = ('Hello,\n\n{0}\n\nPlease contact {1} administrator,\nThe {1} team.'
1✔
933
                .format(msg, current_app.config.get('BRAND')))
934
        mail_dict = dict(recipients=recipients, subject=subject, body=body)
1✔
935
        send_mail(mail_dict)
1✔
936
        raise
1✔
937

938
    cached_projects.delete_browse_tasks(project_id)
1✔
939
    check_and_send_task_notifications(project_id)
1✔
940
    if from_auto:
1✔
941
        form_data['last_import_meta'] = report.metadata
1✔
942
        project.set_autoimporter(form_data)
1✔
943
        project_repo.save(project)
1✔
944
    msg = report.message + ' to your project {0} by {1}.'.format(project.name, current_user_fullname)
1✔
945
    current_app.logger.info("Task import status %s", msg)
1✔
946
    subject = 'Tasks Import to your project %s' % project.name
1✔
947
    body = 'Hello,\n\n' + msg + '\n\nAll the best,\nThe %s team.'\
1✔
948
        % current_app.config.get('BRAND')
949
    mail_dict = dict(recipients=recipients, subject=subject, body=body)
1✔
950
    send_mail(mail_dict)
1✔
951
    return msg
1✔
952

953

954
def export_tasks(current_user_email_addr, short_name,
1✔
955
                 ty, expanded, filetype, filters=None, disclose_gold=False):
956
    """Export tasks/taskruns from a project."""
957
    from pybossa.core import (task_csv_exporter, task_json_exporter,
1✔
958
                              project_repo)
959
    import pybossa.exporter.consensus_exporter as export_consensus
1✔
960
    project = project_repo.get_by_shortname(short_name)
1✔
961
    current_app.logger.info(f"exporting tasks for project {project.id}")
1✔
962

963
    try:
1✔
964
        # Export data and upload .zip file locally
965
        if ty == 'consensus':
1✔
966
            export_fn = getattr(export_consensus,
1✔
967
                                'export_consensus_{}'.format(filetype))
968
        elif filetype == 'json':
1✔
969
            export_fn = task_json_exporter.make_zip
1✔
970
        elif filetype == 'csv':
1✔
971
            export_fn = task_csv_exporter.make_zip
1✔
972
        else:
973
            export_fn = None
1✔
974

975
        mail_dict = dict(recipients=[current_user_email_addr])
1✔
976
        expires_in = current_app.config.get('EXPORT_EXPIRY', 60 * 60 * 12)  # default 12 hours
1✔
977
        # Construct message
978
        if export_fn is not None:
1✔
979
            mail_dict['subject'] = 'Data exported for your project: {0}'.format(project.name)
1✔
980
            with export_fn(project, ty, expanded, filters, disclose_gold) as fp:
1✔
981
                filename = fp.filename
1✔
982
                content = fp.read()
1✔
983

984
            bucket_name = current_app.config.get('EXPORT_BUCKET')
1✔
985
            max_email_size = current_app.config.get('EXPORT_MAX_EMAIL_SIZE', float('Inf'))
1✔
986
            max_s3_upload_size = current_app.config.get('EXPORT_MAX_UPLOAD_SIZE', float('Inf'))
1✔
987

988
            if len(content) > max_s3_upload_size and bucket_name:
1✔
989
                current_app.logger.info("Task export project id %s: Task export exceeded max size %d, actual size: %d",
1✔
990
                                        project.id, max_s3_upload_size, len(content))
991
                mail_dict['subject'] = 'Data export exceeded max file size: {0}'.format(project.name)
1✔
992
                msg = '<p>Your export exceeded the maximum file upload size. ' + \
1✔
993
                    'Please try again with a smaller subset of tasks'
994
            elif len(content) > max_email_size and bucket_name:
1✔
995
                current_app.logger.info("uploading exporting tasks to s3 for project, %s", project.id)
1✔
996
                conn_kwargs = current_app.config.get('S3_EXPORT_CONN', {})
1✔
997
                conn = create_connection(**conn_kwargs)
1✔
998
                bucket = conn.get_bucket(bucket_name, validate=False)
1✔
999
                timestamp = datetime.utcnow().isoformat()
1✔
1000
                key = bucket.new_key('{}-{}'.format(timestamp, filename))
1✔
1001
                key.set_contents_from_string(content)
1✔
1002
                url = key.generate_url(expires_in)
1✔
1003
                current_app.logger.info("Task export project id %s: Exported file uploaded to s3 %s",
1✔
1004
                                        project.id, url)
1005
                msg = '<p>You can download your file <a href="{}">here</a>.</p>'.format(url)
1✔
1006
            else:
1007
                if email_service.enabled:
1✔
1008
                    current_app.logger.info("Uploading email attachment to s3. user email %s, project id %d",
1✔
1009
                                            current_user_email_addr, project.id)
1010
                    expiration_date = (datetime.now() + timedelta(days=90)).strftime('%a, %d %b %Y %H:%M:%S GMT')
1✔
1011
                    url = upload_email_attachment(content, filename, current_user_email_addr, project.id)
1✔
1012
                    msg = f'<p>You can download your file <a href="{url}">here</a> until {expiration_date}.</p>'
1✔
1013
                    current_app.logger.info("Task export project id %s. Email service export_task attachment link %s", project.id, url)
1✔
1014
                else:
1015
                    msg = '<p>Your exported data is attached.</p>'
1✔
1016
                    mail_dict['attachments'] = [Attachment(filename, "application/zip", content)]
1✔
1017
                    current_app.logger.info("Task export project id %s. Exported file attached to email to send",
1✔
1018
                                            project.id)
1019
        else:
1020
            # Failure email
1021
            mail_dict['subject'] = 'Data export failed for your project: {0}'.format(project.name)
1✔
1022
            msg = '<p>There was an issue with your export. ' + \
1✔
1023
                  'Please try again or report this issue ' + \
1024
                  'to a {0} administrator.</p>'
1025
            msg = msg.format(current_app.config.get('BRAND'))
1✔
1026

1027
        if email_service.enabled:
1✔
1028
            mail_dict["body"] = f'\nHello,\n{msg}\nThe {current_app.config.get("BRAND")} team\n'
1✔
1029
            current_app.logger.info("Send email calling email_service. %s", mail_dict)
1✔
1030
            email_service.send(mail_dict)
1✔
1031
        else:
1032
            body = '<p>Hello,</p>' + msg + '<p>The {0} team.</p>'
1✔
1033
            body = body.format(current_app.config.get('BRAND'))
1✔
1034
            mail_dict['html'] = body
1✔
1035
            message = Message(**mail_dict)
1✔
1036
            mail.send(message)
1✔
1037
        current_app.logger.info(
1✔
1038
            'Email sent successfully - Project: %s', project.name)
1039
        job_response = '{0} {1} file was successfully exported for: {2}'
1✔
1040
        return job_response.format(
1✔
1041
                ty.capitalize(), filetype.upper(), project.name)
1042
    except Exception as e:
1✔
1043
        current_app.logger.exception(
1✔
1044
                'Export email failed - Project: %s, exception: %s',
1045
                project.name, str(e))
1046
        subject = 'Email delivery failed for your project: {0}'.format(project.name)
1✔
1047
        msg = 'There was an error when attempting to deliver your data export via email.'
1✔
1048
        body = 'Hello,\n\n' + msg + '\n\nThe {0} team.'
1✔
1049
        body = body.format(current_app.config.get('BRAND'))
1✔
1050
        mail_dict = dict(recipients=[current_user_email_addr],
1✔
1051
                         subject=subject,
1052
                         body=body)
1053
        message = Message(**mail_dict)
1✔
1054
        if email_service.enabled:
1✔
1055
            current_app.logger.info("Sending error email for export tasks using email_service. %r", mail_dict)
×
1056
            email_service.send(mail_dict)
×
1057
        else:
1058
            mail.send(message)
1✔
1059
        raise
1✔
1060

1061

1062
def webhook(url, payload=None, oid=None, rerun=False):
1✔
1063
    """Post to a webhook."""
1064
    from flask import current_app
1✔
1065
    from readability.readability import Document
1✔
1066
    try:
1✔
1067
        import json
1✔
1068
        from pybossa.core import sentinel, webhook_repo, project_repo
1✔
1069
        project = project_repo.get(payload['project_id'])
1✔
1070
        headers = {'Content-type': 'application/json', 'Accept': 'text/plain'}
1✔
1071
        if oid:
1✔
1072
            webhook = webhook_repo.get(oid)
1✔
1073
        else:
1074
            webhook = Webhook(project_id=payload['project_id'],
1✔
1075
                              payload=payload)
1076
        if url:
1✔
1077
            params = dict()
1✔
1078
            if rerun:
1✔
1079
                params['rerun'] = True
1✔
1080
            response = requests.post(url, params=params,
1✔
1081
                                     data=json.dumps(payload),
1082
                                     headers=headers)
1083
            webhook.response = Document(response.text).summary()
1✔
1084
            webhook.response_status_code = response.status_code
1✔
1085
        else:
1086
            raise requests.exceptions.ConnectionError('Not URL')
1✔
1087
        if oid:
1✔
1088
            webhook_repo.update(webhook)
1✔
1089
            webhook = webhook_repo.get(oid)
1✔
1090
        else:
1091
            webhook_repo.save(webhook)
1✔
1092
    except requests.exceptions.ConnectionError:
1✔
1093
        webhook.response = 'Connection Error'
1✔
1094
        webhook.response_status_code = None
1✔
1095
        webhook_repo.save(webhook)
1✔
1096
    finally:
1097
        if project.published and webhook.response_status_code != 200 and current_app.config.get('ADMINS'):
1✔
1098
            subject = "Broken: %s webhook failed" % project.name
1✔
1099
            body = 'Sorry, but the webhook failed'
1✔
1100
            mail_dict = dict(recipients=current_app.config.get('ADMINS'),
1✔
1101
                             subject=subject, body=body, html=webhook.response)
1102
            send_mail(mail_dict)
1✔
1103
    if current_app.config.get('SSE'):
1✔
1104
        publish_channel(sentinel, payload['project_short_name'],
1✔
1105
                        data=webhook.dictize(), type='webhook',
1106
                        private=True)
1107
    return webhook
1✔
1108

1109

1110
def notify_blog_users(blog_id, project_id, queue='high'):
1✔
1111
    """Send email with new blog post."""
1112
    from sqlalchemy.sql import text
1✔
1113
    from pybossa.core import db
1✔
1114
    from pybossa.core import blog_repo
1✔
1115
    from pybossa.pro_features import ProFeatureHandler
1✔
1116

1117
    blog = blog_repo.get(blog_id)
1✔
1118
    users = 0
1✔
1119
    feature_handler = ProFeatureHandler(current_app.config.get('PRO_FEATURES'))
1✔
1120
    only_pros = feature_handler.only_for_pro('notify_blog_updates')
1✔
1121
    timeout = current_app.config.get('TIMEOUT')
1✔
1122
    if blog.project.featured or (blog.project.owner.pro or not only_pros):
1✔
1123
        sql = text('''
1✔
1124
                   SELECT email_addr, name from "user", task_run
1125
                   WHERE task_run.project_id=:project_id
1126
                   AND task_run.user_id="user".id
1127
                   AND "user".subscribed=true
1128
                   AND "user".restrict=false
1129
                   GROUP BY email_addr, name, subscribed;
1130
                   ''')
1131
        results = db.slave_session.execute(sql, dict(project_id=project_id))
1✔
1132
        for row in results:
1✔
1133
            subject = "Project Update: %s by %s" % (blog.project.name,
1✔
1134
                                                    blog.project.owner.fullname)
1135
            body = render_template('/account/email/blogupdate.md',
1✔
1136
                                   user_name=row.name,
1137
                                   blog=blog,
1138
                                   config=current_app.config)
1139
            html = render_template('/account/email/blogupdate.html',
1✔
1140
                                   user_name=row.name,
1141
                                   blog=blog,
1142
                                   config=current_app.config)
1143
            mail_dict = dict(recipients=[row.email_addr],
1✔
1144
                             subject=subject,
1145
                             body=body,
1146
                             html=html)
1147

1148
            job = dict(name=send_mail,
1✔
1149
                       args=[mail_dict],
1150
                       kwargs={},
1151
                       timeout=timeout,
1152
                       queue=queue)
1153
            enqueue_job(job)
1✔
1154
            users += 1
1✔
1155
    msg = "%s users notified by email" % users
1✔
1156
    return msg
1✔
1157

1158

1159
def notify_task_progress(info, email_addr, queue='high'):
1✔
1160
    """ send email about the progress of task completion """
1161

1162
    subject = "Project progress reminder for {}".format(info['project_name'])
1✔
1163
    msg = """There are only {} tasks left as incomplete in your project {}.
1✔
1164
          """.format(info['n_available_tasks'], info['project_name'])
1165
    body = ('Hello,\n\n{}\nThe {} team.'
1✔
1166
            .format(msg, current_app.config.get('BRAND')))
1167
    mail_dict = dict(recipients=email_addr,
1✔
1168
                        subject=subject,
1169
                        body=body)
1170

1171
    timeout = current_app.config.get('TIMEOUT')
1✔
1172
    job = dict(name=send_mail,
1✔
1173
                args=[mail_dict],
1174
                kwargs={},
1175
                timeout=timeout,
1176
                queue=queue)
1177
    enqueue_job(job)
1✔
1178

1179
def get_weekly_admin_report_jobs():
1✔
1180
    """Return email jobs with weekly report to admins"""
1181
    send_emails_date = current_app.config.get('WEEKLY_ADMIN_REPORTS').lower()
1✔
1182
    recipients = current_app.config.get('WEEKLY_ADMIN_REPORTS_EMAIL')
1✔
1183
    today = datetime.today().strftime('%A').lower()
1✔
1184
    timeout = current_app.config.get('TIMEOUT')
1✔
1185
    current_app.logger.info('Checking weekly report for admins, scheduled date: {}, today: {}'
1✔
1186
                            .format(send_emails_date, today))
1187
    jobs = []
1✔
1188
    if recipients and today == send_emails_date:
1✔
1189
        info = dict(timestamp=datetime.now().isoformat(),
1✔
1190
            user_id=0, # user_id=0 indicates auto-generated report for admins
1191
            base_url=current_app.config.get('SERVER_URL') or '' + '/project/')
1192
        project_report = dict(name=mail_project_report,
1✔
1193
                    args=[info, recipients],
1194
                    kwargs={},
1195
                    timeout=timeout,
1196
                    queue='low')
1197
        fmt = 'csv'
1✔
1198
        user_report = dict(name=export_all_users,
1✔
1199
                    args=[fmt, recipients],
1200
                    kwargs={},
1201
                    timeout=timeout,
1202
                    queue='low')
1203
        jobs = [project_report, user_report]
1✔
1204
    return iter(jobs)
1✔
1205

1206
def get_weekly_stats_update_projects():
1✔
1207
    """Return email jobs with weekly stats update for project owner."""
1208
    from sqlalchemy.sql import text
1✔
1209
    from pybossa.core import db
1✔
1210
    from pybossa.pro_features import ProFeatureHandler
1✔
1211

1212
    feature_handler = ProFeatureHandler(current_app.config.get('PRO_FEATURES'))
1✔
1213
    only_pros = feature_handler.only_for_pro('project_weekly_report')
1✔
1214
    only_pros_sql = 'AND "user".pro=true' if only_pros else ''
1✔
1215
    send_emails_date = current_app.config.get('WEEKLY_UPDATE_STATS')
1✔
1216
    today = datetime.today().strftime('%A').lower()
1✔
1217
    timeout = current_app.config.get('TIMEOUT')
1✔
1218
    if today.lower() == send_emails_date.lower():
1✔
1219
        sql = text('''
1✔
1220
                   SELECT project.id
1221
                   FROM project, "user", task
1222
                   WHERE "user".id=project.owner_id %s
1223
                   AND "user".subscribed=true
1224
                   AND "user".restrict=false
1225
                   AND task.project_id=project.id
1226
                   AND task.state!='completed'
1227
                   UNION
1228
                   SELECT project.id
1229
                   FROM project
1230
                   WHERE project.featured=true;
1231
                   ''' % only_pros_sql)
1232
        results = db.slave_session.execute(sql)
1✔
1233
        for row in results:
1✔
1234
            job = dict(name=send_weekly_stats_project,
1✔
1235
                       args=[row.id],
1236
                       kwargs={},
1237
                       timeout=timeout,
1238
                       queue='low')
1239
            yield job
1✔
1240

1241

1242
def send_weekly_stats_project(project_id):
1✔
1243
    from pybossa.cache.project_stats import update_stats, get_stats
1✔
1244
    from pybossa.core import project_repo
1✔
1245
    from datetime import datetime
1✔
1246
    project = project_repo.get(project_id)
1✔
1247
    if project.owner.subscribed is False or project.owner.restrict:
1✔
1248
        return "Owner does not want updates by email"
1✔
1249
    update_stats(project_id)
1✔
1250
    dates_stats, hours_stats, users_stats = get_stats(project_id,
1✔
1251
                                                      period='1 week')
1252
    subject = "Weekly Update: %s" % project.name
1✔
1253

1254
    timeout = current_app.config.get('TIMEOUT')
1✔
1255

1256
    # Max number of completed tasks
1257
    n_completed_tasks = 0
1✔
1258
    xy = list(zip(*dates_stats[3]['values']))
1✔
1259
    n_completed_tasks = max(xy[1])
1✔
1260
    # Most active day
1261
    xy = list(zip(*dates_stats[0]['values']))
1✔
1262
    active_day = [xy[0][xy[1].index(max(xy[1]))], max(xy[1])]
1✔
1263
    active_day[0] = datetime.fromtimestamp(active_day[0]/1000).strftime('%A')
1✔
1264
    body = render_template('/account/email/weeklystats.md',
1✔
1265
                           project=project,
1266
                           dates_stats=dates_stats,
1267
                           hours_stats=hours_stats,
1268
                           users_stats=users_stats,
1269
                           n_completed_tasks=n_completed_tasks,
1270
                           active_day=active_day,
1271
                           config=current_app.config)
1272
    html = render_template('/account/email/weeklystats.html',
1✔
1273
                           project=project,
1274
                           dates_stats=dates_stats,
1275
                           hours_stats=hours_stats,
1276
                           users_stats=users_stats,
1277
                           active_day=active_day,
1278
                           n_completed_tasks=n_completed_tasks,
1279
                           config=current_app.config)
1280
    mail_dict = dict(recipients=[project.owner.email_addr],
1✔
1281
                     subject=subject,
1282
                     body=body,
1283
                     html=html)
1284

1285
    job = dict(name=send_mail,
1✔
1286
               args=[mail_dict],
1287
               kwargs={},
1288
               timeout=timeout,
1289
               queue='high')
1290
    enqueue_job(job)
1✔
1291

1292

1293
def news():
1✔
1294
    """Get news from different ATOM RSS feeds."""
1295
    import feedparser
1✔
1296
    from pybossa.core import sentinel
1✔
1297
    from pybossa.news import get_news, notify_news_admins, FEED_KEY
1✔
1298
    try:
1✔
1299
        import pickle as pickle
1✔
1300
    except ImportError:  # pragma: no cover
1301
        import pickle
1302
    urls = ['https://github.com/Scifabric/pybossa/releases.atom',
1✔
1303
            'http://scifabric.com/blog/all.atom.xml']
1304
    score = 0
1✔
1305
    notify = False
1✔
1306
    if current_app.config.get('NEWS_URL'):
1✔
1307
        urls += current_app.config.get('NEWS_URL')
1✔
1308
    for url in urls:
1✔
1309
        d = feedparser.parse(url)
1✔
1310
        tmp = get_news(score)
1✔
1311
        if (d.entries and (len(tmp) == 0)
1✔
1312
           or (tmp[0]['updated'] != d.entries[0]['updated'])):
1313
            mapping = dict()
1✔
1314
            mapping[pickle.dumps(d.entries[0])] = float(score)
1✔
1315
            sentinel.master.zadd(FEED_KEY, mapping)
1✔
1316
            notify = True
1✔
1317
        score += 1
1✔
1318
    if notify:
1✔
1319
        notify_news_admins()
1✔
1320

1321
def check_failed():
1✔
1322
    """Check the jobs that have failed and requeue them."""
1323
    from rq import requeue_job
1✔
1324
    from rq.registry import FailedJobRegistry
1✔
1325
    from pybossa.core import sentinel
1✔
1326

1327
    # Per https://github.com/rq/rq/blob/master/CHANGES.md
1328
    # get_failed_queue has been removed
1329
    fq = FailedJobRegistry()
1✔
1330
    job_ids = fq.get_job_ids()
1✔
1331
    count = len(job_ids)
1✔
1332
    FAILED_JOBS_RETRIES = current_app.config.get('FAILED_JOBS_RETRIES')
1✔
1333
    for job_id in job_ids:
1✔
1334
        KEY = 'pybossa:job:failed:%s' % job_id
1✔
1335
        job = fq.fetch_job(job_id)
1✔
1336
        if sentinel.slave.exists(KEY):
1✔
1337
            sentinel.master.incr(KEY)
1✔
1338
        else:
1339
            ttl = current_app.config.get('FAILED_JOBS_MAILS')*24*60*60
1✔
1340
            sentinel.master.setex(KEY, ttl, 1)
1✔
1341
        if int(sentinel.slave.get(KEY)) < FAILED_JOBS_RETRIES:
1✔
1342
            requeue_job(job_id, sentinel.master)
1✔
1343
        else:
1344
            KEY = 'pybossa:job:failed:mailed:%s' % job_id
1✔
1345
            if (not sentinel.slave.exists(KEY) and
1✔
1346
                    current_app.config.get('ADMINS')):
1347
                subject = "JOB: %s has failed more than 3 times" % job_id
1✔
1348
                body = "Please, review the background jobs of your server."
1✔
1349
                body += "\n This is the trace error\n\n"
1✔
1350
                body += "------------------------------\n\n"
1✔
1351
                body += job.exc_info
1✔
1352
                mail_dict = dict(recipients=current_app.config.get('ADMINS'),
1✔
1353
                                 subject=subject, body=body)
1354
                send_mail(mail_dict)
1✔
1355
                ttl = current_app.config.get('FAILED_JOBS_MAILS')*24*60*60
1✔
1356
                sentinel.master.setex(KEY, ttl, 1)
1✔
1357
    if count > 0:
1✔
1358
        return "JOBS: %s You have failed the system." % job_ids
1✔
1359
    else:
1360
        return "You have not failed the system"
1✔
1361

1362

1363
def mail_project_report(info, email_addr):
1✔
1364
    from pybossa.core import project_csv_exporter
1✔
1365

1366
    recipients = email_addr if isinstance(email_addr, list) else [email_addr]
1✔
1367
    current_app.logger.info('Scheduling mail_project_report job {}'.format(str(info)))
1✔
1368
    try:
1✔
1369
        zipfile = None
1✔
1370
        filename = project_csv_exporter.zip_name(info)
1✔
1371
        subject = '{} project report'.format(current_app.config['BRAND'])
1✔
1372
        body = 'Hello,\n\n{}\n\nThe {} team.'
1✔
1373

1374
        zipfile = project_csv_exporter.generate_zip_files(info)
1✔
1375
        if email_service.enabled:
1✔
1376
            current_app.logger.info("Uploading email attachment to s3 for project report. user email %s", email_addr)
1✔
1377
            expiration_date = (datetime.now() + timedelta(days=90)).strftime('%a, %d %b %Y %H:%M:%S GMT')
1✔
1378
            content = None
1✔
1379
            with open(zipfile, mode='rb') as fp:  # open zipfile in binary mode
1✔
1380
                content = fp.read()
1✔
1381
            if not content:
1✔
1382
                raise ValueError("No content in zipfile: {}".format(zipfile))
×
1383

1384
            url = upload_email_attachment(content, filename, email_addr)
1✔
1385
            msg = f'<p>You can download your file <a href="{url}">here</a> until {expiration_date}.</p>'
×
1386
            body = body.format(msg, current_app.config.get('BRAND'))
×
1387
            mail_dict = dict(recipients=recipients,
×
1388
                        subject=subject,
1389
                        body=body)
1390
            current_app.logger.info("Project report for user %s generated email with report link %s", email_addr, url)
×
1391
        else:
1392
            msg = 'Your exported data is attached.'
1✔
1393
            body = body.format(msg, current_app.config.get('BRAND'))
1✔
1394
            mail_dict = dict(recipients=recipients,
1✔
1395
                            subject=subject,
1396
                            body=body)
1397

1398
            attachment = None
1✔
1399
            with open(zipfile, mode='rb') as fp:  # open zipfile in binary mode
1✔
1400
                attachment = Attachment(filename, "application/zip",
1✔
1401
                                        fp.read())
1402
            if not attachment:
1✔
1403
                raise ValueError("No content in zipfile: {}".format(zipfile))
×
1404
            mail_dict['attachments'] = [attachment]
1✔
1405
    except Exception:
1✔
1406
        current_app.logger.exception('Error in mail_project_report')
1✔
1407
        subject = 'Error in {} project report'.format(current_app.config['BRAND'])
1✔
1408
        msg = 'An error occurred while exporting your report.'
1✔
1409

1410
        body = 'Hello,\n\n{}\n\nThe {} team.'
1✔
1411
        body = body.format(msg, current_app.config.get('BRAND'))
1✔
1412
        mail_dict = dict(recipients=recipients,
1✔
1413
                         subject=subject,
1414
                         body=body)
1415
        raise
1✔
1416
    finally:
1417
        if zipfile:
1✔
1418
            os.unlink(zipfile)
1✔
1419
        if email_service.enabled:
1✔
1420
            email_service.send(mail_dict)
1✔
1421
        else:
1422
            send_mail(mail_dict)
1✔
1423

1424

1425
def delete_account(user_id, admin_addr, **kwargs):
1✔
1426
    """Delete user account from the system."""
1427
    from pybossa.core import (user_repo, uploader)
1✔
1428
    user = user_repo.get(user_id)
1✔
1429

1430
    container = "user_%s" % user.id
1✔
1431
    if user.info.get('avatar'):
1✔
1432
        uploader.delete_file(user.info['avatar'], container)
1✔
1433

1434
    email = user.email_addr
1✔
1435
    if current_app.config.get('MAILCHIMP_API_KEY'):
1✔
1436
        from pybossa.core import newsletter
1✔
1437
        newsletter.init_app(current_app)
1✔
1438
        mailchimp_deleted = newsletter.delete_user(email)
1✔
1439
    else:
1440
        mailchimp_deleted = True
×
1441
    brand = current_app.config.get('BRAND')
1✔
1442
    user_repo.delete_data(user)
1✔
1443
    subject = '[%s]: Your account has been deleted' % brand
1✔
1444
    body = """Hi,\n Your account and personal data has been deleted from %s.""" % brand
1✔
1445
    if not mailchimp_deleted:
1✔
1446
        body += '\nWe could not delete your Mailchimp account, please contact us to fix this issue.'
1✔
1447
    if current_app.config.get('DISQUS_SECRET_KEY'):
1✔
1448
        body += '\nDisqus does not provide an API method to delete your account. You will have to do it by hand yourself in the disqus.com site.'
1✔
1449
    recipients = [email]
1✔
1450
    if current_app.config.get('ADMINS'):
1✔
1451
        for em in current_app.config.get('ADMINS'):
1✔
1452
            recipients.append(em)
1✔
1453
    bcc = [admin_addr]
1✔
1454
    mail_dict = dict(recipients=recipients, bcc=bcc, subject=subject, body=body)
1✔
1455
    send_mail(mail_dict, mail_all=True)
1✔
1456

1457

1458
def export_userdata(user_id, admin_addr, **kwargs):
1✔
1459
    from pybossa.core import (user_repo)
1✔
1460
    from flask import current_app
1✔
1461
    json_exporter = JsonExporter()
1✔
1462
    user = user_repo.get(user_id)
1✔
1463
    user_data = user.dictize()
1✔
1464
    del user_data['passwd_hash']
1✔
1465

1466
    buffer = BytesIO()  # ZipFile expects Bytes
1✔
1467
    with ZipFile(buffer, 'w') as zf:
1✔
1468
        zf.writestr('personal_data.json', json.dumps(user_data))
1✔
1469
    buffer.seek(0)
1✔
1470
    attachments = [Attachment('personal_data.zip', 'application/zip', buffer.read())]
1✔
1471
    body = render_template('/account/email/exportdata.md',
1✔
1472
                           user=user.dictize(),
1473
                           personal_data_link=None,
1474
                           config=current_app.config)
1475

1476
    html = render_template('/account/email/exportdata.html',
1✔
1477
                           user=user.dictize(),
1478
                           personal_data_link=None,
1479
                           config=current_app.config)
1480
    subject = 'Your personal data'
1✔
1481
    bcc = [admin_addr]
1✔
1482
    mail_dict = dict(recipients=[user.email_addr],
1✔
1483
                     bcc=bcc,
1484
                     subject=subject,
1485
                     body=body,
1486
                     html=html,
1487
                     attachments=attachments)
1488
    send_mail(mail_dict)
1✔
1489

1490

1491
def delete_file(fname, container):
1✔
1492
    """Delete file."""
1493
    from pybossa.core import uploader
1✔
1494
    return uploader.delete_file(fname, container)
1✔
1495

1496
def load_usage_dashboard_data(days):
1✔
1497
    timed_stats_funcs = [
1✔
1498
        (site_stats.number_of_created_jobs, "Projects"),
1499
        (site_stats.n_tasks_site, "Tasks"),
1500
        (site_stats.n_task_runs_site, "Taskruns"),
1501
    ]
1502

1503
    # total tasks, taskruns, projects over a specified amount of time.
1504
    stats = OrderedDict()
1✔
1505
    for func, title in timed_stats_funcs:
1✔
1506
        stats[title] = [(func(days), None, None)]
1✔
1507

1508
    # component usage
1509
    for name, tag in current_app.config.get("USAGE_DASHBOARD_COMPONENTS", {}).items():
1✔
1510
        stats[name] = site_stats.n_projects_using_component(days=days, component=tag)
1✔
1511

1512
    return stats
1✔
1513

1514
def load_management_dashboard_data():
1✔
1515
    # charts
1516
    project_chart = site_stats.project_chart()  # < 1s
1✔
1517
    category_chart = site_stats.category_chart()  # < 1s
1✔
1518
    task_chart = site_stats.task_chart()  # 110s in QA
1✔
1519
    submission_chart = site_stats.submission_chart()  # 9s in QA
1✔
1520

1521
    # General platform usage
1522
    timed_stats_funcs = [
1✔
1523
        site_stats.number_of_active_jobs,  # 1s
1524
        site_stats.number_of_created_jobs,  # 1s
1525
        site_stats.number_of_created_tasks,  # 90s(1.5,2.5,3s,81s) in QA with new index
1526
        site_stats.number_of_completed_tasks,  # 300s(6s,94s,82s,102s) in QA
1527
        site_stats.avg_time_to_complete_task,  # 24s(4s,4s,4s,12s) in QA
1528
        site_stats.number_of_active_users,  # 35s(4s,4s,5s,11s,11s) in QA
1529
        site_stats.categories_with_new_projects  # 1s
1530
    ]
1531

1532
    # Work on platform
1533
    current_stats_funcs = [
1✔
1534
        site_stats.avg_task_per_job,  # < 1s
1535
        site_stats.tasks_per_category  # < 1s
1536
    ]
1537

1538
    timed_stats = OrderedDict()
1✔
1539
    for func in timed_stats_funcs:
1✔
1540
        timed_stats[func.__doc__] = OrderedDict()
1✔
1541
        for days in [30, 60, 90, 350, 'all']:
1✔
1542
            timed_stats[func.__doc__][days] = func(days)
1✔
1543

1544
    current_stats = OrderedDict((func.__doc__, func())
1✔
1545
                                for func in current_stats_funcs)
1546
    return project_chart, category_chart, task_chart, submission_chart, timed_stats, current_stats
1✔
1547

1548

1549
def get_management_dashboard_stats(user_email):
1✔
1550
    """Rebuild management dashboard stats, notify user about its availability"""
1551
    load_management_dashboard_data()
1✔
1552

1553
    subject = 'Management Dashboard Statistics'
1✔
1554
    msg = 'Management dashboard statistics is now available. It can be accessed by refreshing management dashboard page.'
1✔
1555
    body = ('Hello,\n\n{}\nThe {} team.'
1✔
1556
            .format(msg, current_app.config.get('BRAND')))
1557
    mail_dict = dict(recipients=[user_email], subject=subject, body=body)
1✔
1558
    send_mail(mail_dict)
1✔
1559

1560

1561
def check_and_send_task_notifications(project_id, conn=None):
1✔
1562
    from pybossa.core import project_repo
1✔
1563

1564
    project = project_repo.get(project_id)
1✔
1565
    if not project:
1✔
1566
        return
1✔
1567

1568
    reminder = project.info.get('progress_reminder', {})
1✔
1569
    target_remaining = reminder.get("target_remaining")
1✔
1570
    webhook = reminder.get('webhook')
1✔
1571
    email_already_sent = reminder.get("sent") or False
1✔
1572
    if target_remaining is None:
1✔
1573
        return
1✔
1574

1575
    n_remaining_tasks = n_available_tasks(project.id)
1✔
1576

1577
    update_reminder = False
1✔
1578
    if n_remaining_tasks > target_remaining and email_already_sent:
1✔
1579
        current_app.logger.info('Project {}, the number of tasks in queue: {} \
1✔
1580
                                exceeds target remaining: {}, \
1581
                                resetting the send notification flag to True'
1582
                                .format(project_id, n_remaining_tasks, target_remaining))
1583
        reminder['sent'] = False
1✔
1584
        update_reminder = True
1✔
1585

1586
    if n_remaining_tasks <= target_remaining and not email_already_sent:
1✔
1587
        # incomplete tasks drop to or below, and email not sent yet, send email
1588
        current_app.logger.info('Project {} the number of tasks in queue: {}, \
1✔
1589
                                drops equal to or below target remaining: {}, \
1590
                                sending task notification to owners: {}'
1591
                                .format(project_id, n_remaining_tasks, target_remaining, project.owners_ids))
1592
        email_addr = [cached_users.get_user_email(user_id)
1✔
1593
                        for user_id in project.owners_ids]
1594
        info = dict(project_name=project.name,
1✔
1595
                    n_available_tasks=n_remaining_tasks)
1596
        notify_task_progress(info, email_addr)
1✔
1597

1598
        reminder['sent'] = True
1✔
1599
        update_reminder = True
1✔
1600

1601
        if webhook:
1✔
1602
            current_app.logger.info('Project {} the number of tasks in queue: {}, \
1✔
1603
                                drops equal to or below target remaining: {}, hitting webhook url: {}'
1604
                                .format(project_id, n_remaining_tasks, target_remaining, webhook))
1605
            try:
1✔
1606
                headers = {'Content-type': 'application/json', 'Accept': 'text/plain'}
1✔
1607
                data = dict(project_id=project_id,
1✔
1608
                            project_name=project.name,
1609
                            remianing_tasks=n_remaining_tasks,
1610
                            target_remaining=target_remaining)
1611
                webhook_response = requests.post(webhook, data=json.dumps(data), headers=headers)
1✔
1612

1613
                if webhook_response.status_code >= 400:
1✔
1614
                    reminder['webhook'] = ''
1✔
1615
                    # send email to project owners
1616
                    subject = 'Webhook failed from {}'.format(project_id)
1✔
1617
                    body = '\n'.join(
1✔
1618
                        ['Hello,\n',
1619
                        'The webhook {} returns {}, please make sure the webhook is valid.',
1620
                        'Current webhook will be disabled, please re-activate it in task notification configuration.',
1621
                        'Thank you,\n',
1622
                        'The {} team.']).format(webhook, webhook_response.status_code, current_app.config.get('BRAND'))
1623
                    mail_dict = dict(recipients=email_addr, subject=subject, body=body)
1✔
1624
                    send_mail(mail_dict)
1✔
1625
                    raise Exception('webhook response error, returned {}'.format(webhook_response.status_code))
1✔
1626
                else:
1627
                    current_app.logger.info('Webhook {} posted'.format(webhook))
×
1628
            except Exception as e:
1✔
1629
                current_app.logger.exception('An error occured while posting to project {} webhook {}, {}'
1✔
1630
                                           .format(project_id, webhook, str(e)))
1631

1632
    if update_reminder:
1✔
1633
        project.info['progress_reminder'] = reminder
1✔
1634
        if conn is not None:
1✔
1635
            # Listener process is updating the task notification.
1636
            sql = text(''' UPDATE project SET info=:info WHERE id=:id''')
1✔
1637
            conn.execute(sql, dict(info=json.dumps(project.info), id=project_id))
1✔
1638
        else:
1639
            # User is updating the task notification from the project settings.
1640
            project_repo.save(project)
1✔
1641

1642

1643
def export_all_users(fmt, email_addr):
1✔
1644
    exportable_attributes = ('id', 'name', 'fullname', 'email_addr', 'locale',
1✔
1645
                             'created', 'admin', 'subadmin', 'enabled', 'languages',
1646
                             'locations', 'work_hours_from', 'work_hours_to',
1647
                             'timezone', 'type_of_user', 'additional_comments',
1648
                             'total_projects_contributed', 'completed_task_runs',
1649
                             'percentage_tasks_completed', 'first_submission_date',
1650
                             'last_submission_date', 'avg_time_per_task', 'consent',
1651
                             'restrict')
1652

1653
    def respond_json():
1✔
1654
        return gen_json()
1✔
1655

1656
    def gen_json():
1✔
1657
        users = get_users_for_report()
1✔
1658
        jdata = json.dumps(users)
1✔
1659
        return jdata
1✔
1660

1661
    def respond_csv():
1✔
1662
        users = get_users_for_report()
1✔
1663
        df = pd.DataFrame.from_dict(users)
1✔
1664
        user_csv = df.to_csv(columns=exportable_attributes, index=False)
1✔
1665
        return user_csv
1✔
1666

1667
    recipients = email_addr if isinstance(email_addr, list) else [email_addr]
1✔
1668
    current_app.logger.info('Scheduling export_all_users job send to {} admins/users'
1✔
1669
                            .format(len(recipients)))
1670

1671
    try:
1✔
1672
        data = {"json": respond_json, "csv": respond_csv}[fmt]()
1✔
1673
        if email_service.enabled:
1✔
1674
            current_app.logger.info("Uploading email attachment to s3 for export users report. user email %s", email_addr)
1✔
1675
            expiration_date = (datetime.now() + timedelta(days=90)).strftime('%a, %d %b %Y %H:%M:%S GMT')
1✔
1676
            filename = 'user_export.{}'.format(fmt)
1✔
1677
            url = upload_email_attachment(data, filename, email_addr)
1✔
1678
            body = f'<p>You can download your file <a href="{url}">here</a> until {expiration_date}.</p>'
1✔
1679
            mail_dict = dict(recipients=recipients,
1✔
1680
                        subject="User Export",
1681
                        body=body)
1682
            current_app.logger.info("Export users for user %s generated email with report link %s", email_addr, url)
1✔
1683
        else:
1684
            mimetype = {"csv": "text/csv", "zip": "application/zip", "json": "application/json"}
1✔
1685
            attachment = Attachment(
1✔
1686
                'user_export.{}'.format(fmt),
1687
                mimetype.get(fmt, "application/octet-stream"),
1688
                data
1689
            )
1690
            mail_dict = {
1✔
1691
                'recipients': recipients,
1692
                'subject': 'User Export',
1693
                'body': 'Your exported data is attached.',
1694
                'attachments': [attachment]
1695
            }
1696
    except Exception as e:
1✔
1697
        mail_dict = {
1✔
1698
            'recipients': [email_addr],
1699
            'subject': 'User Export Failed',
1700
            'body': 'User export failed, {}'.format(str(e))
1701
        }
1702
        raise
1✔
1703
    finally:
1704
        if email_service.enabled:
1✔
1705
            email_service.send(mail_dict)
1✔
1706
        else:
1707
            send_mail(mail_dict)
1✔
1708

1709

1710
# TODO: uncomment, reuse this under future PR
1711
# def get_completed_tasks_cleaup_jobs(queue="weekly"):
1712
#     """Return job that will perform cleanup of completed tasks."""
1713
#     timeout = current_app.config.get('TIMEOUT')
1714
#     job = dict(name=perform_completed_tasks_cleanup,
1715
#                 args=[],
1716
#                 kwargs={},
1717
#                 timeout=timeout,
1718
#                 queue=queue)
1719
#     yield job
1720

1721

1722
def perform_completed_tasks_cleanup():
1✔
1723
    from sqlalchemy.sql import text
1✔
1724
    from pybossa.core import db
1✔
1725
    from pybossa.purge_data import purge_task_data
1✔
1726

1727
    valid_days = [days[0] for days in current_app.config.get('COMPLETED_TASK_CLEANUP_DAYS', [(None, None)]) if days[0]]
1✔
1728
    if not valid_days:
1✔
1729
        current_app.logger.info("Skipping perform completed tasks cleanup. Missing configuration COMPLETED_TASK_CLEANUP_DAYS.")
1✔
1730
        return
1✔
1731

1732
    # identify projects that are set for automated completed tasks cleanup
1733
    projects = []
1✔
1734
    sql = text('''SELECT id as project_id, info->>'completed_tasks_cleanup_days' as cleanup_days FROM project
1✔
1735
               WHERE info->>'completed_tasks_cleanup_days' IS NOT NULL
1736
               ;''')
1737
    results = db.slave_session.execute(sql)
1✔
1738
    for row in results:
1✔
1739
        project_id = row.project_id
1✔
1740
        try:
1✔
1741
            cleanup_days = int(row.cleanup_days)
1✔
1742
        except ValueError:
1✔
1743
            cleanup_days = -1
1✔
1744
        if cleanup_days not in valid_days:
1✔
1745
            current_app.logger.info(
1✔
1746
                f"Skipping project cleanup days due to invalid cleanup days,"
1747
                f"project id {project_id}, completed_tasks_cleanup_days {row.cleanup_days}, valid days {valid_days}"
1748
            )
1749
        else:
1750
            projects.append((project_id, cleanup_days))
1✔
1751

1752
    for project in projects:
1✔
1753
        project_id, cleanup_days = project
1✔
1754
        # identify tasks that are set for automated completed tasks cleanup
1755
        sql = text('''SELECT id AS task_id FROM task
1✔
1756
                    WHERE  project_id=:project_id AND
1757
                    state=:state AND
1758
                    TO_DATE(created, 'YYYY-MM-DD"T"HH24:MI:SS.US') <= NOW() - ':duration days' :: INTERVAL
1759
                    ORDER BY created;
1760
                ;''')
1761
        params = dict(project_id=project_id, state="completed", duration=cleanup_days)
1✔
1762
        results = db.slave_session.execute(sql, params)
1✔
1763
        total_tasks = results.rowcount if results else 0
1✔
1764
        current_app.logger.info(f"Performing cleanup of {total_tasks} completed tasks for project {project_id} that are older than {cleanup_days} days or more.")
1✔
1765
        for row in results:
1✔
1766
            purge_task_data(row.task_id, project_id)
1✔
1767
        current_app.logger.info(f"Finished cleanup of completed tasks for project {project_id}")
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc