• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

bloomberg / pybossa / 12266177335

10 Dec 2024 10:56PM UTC coverage: 94.108% (+0.02%) from 94.084%
12266177335

Pull #1013

github

dchhabda
remove incremental sched
Pull Request #1013: RDISCROWD-7806: Remove bfs, dfs unused schedulers

1 of 1 new or added line in 1 file covered. (100.0%)

2 existing lines in 2 files now uncovered.

17443 of 18535 relevant lines covered (94.11%)

0.94 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.4
/pybossa/sched.py
1
# -*- coding: utf8 -*-
2
# This file is part of PYBOSSA.
3
#
4
# Copyright (C) 2015 Scifabric LTD.
5
#
6
# PYBOSSA is free software: you can redistribute it and/or modify
7
# it under the terms of the GNU Affero General Public License as published by
8
# the Free Software Foundation, either version 3 of the License, or
9
# (at your option) any later version.
10
#
11
# PYBOSSA is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
# GNU Affero General Public License for more details.
15
#
16
# You should have received a copy of the GNU Affero General Public License
17
# along with PYBOSSA.  If not, see <http://www.gnu.org/licenses/>.
18
"""Scheduler module for PYBOSSA tasks."""
1✔
19
import sys
1✔
20
from functools import wraps
1✔
21
from sqlalchemy.sql import func, desc, text
1✔
22
from sqlalchemy.sql import and_, or_
1✔
23
from pybossa.model import DomainObject
1✔
24
from pybossa.model.task import Task
1✔
25
from pybossa.model.task_run import TaskRun
1✔
26
from pybossa.core import db, sentinel, project_repo, task_repo
1✔
27
from .redis_lock import (LockManager, get_active_user_key, get_user_tasks_key,
1✔
28
                         get_task_users_key, get_task_id_project_id_key,
29
                         register_active_user, unregister_active_user,
30
                         get_active_user_count,
31
                         EXPIRE_RESERVE_TASK_LOCK_DELAY, EXPIRE_LOCK_DELAY)
32
from .contributions_guard import ContributionsGuard
1✔
33
from werkzeug.exceptions import BadRequest, Forbidden
1✔
34
import random
1✔
35
import json
1✔
36
from pybossa.cache import users as cached_users
1✔
37
from pybossa.cache import task_browse_helpers as cached_task_browse_helpers
1✔
38
from flask import current_app
1✔
39
from pybossa import data_access
1✔
40
from datetime import datetime
1✔
41
import re
1✔
42

43
from pybossa.util import SavedTaskPositionEnum, get_user_saved_partial_tasks
1✔
44

45
session = db.slave_session
1✔
46

47

48
class Schedulers(object):
1✔
49

50
    locked = 'locked_scheduler'
1✔
51
    user_pref = 'user_pref_scheduler'
1✔
52
    task_queue = 'task_queue_scheduler'
1✔
53

54

55
DEFAULT_SCHEDULER = Schedulers.locked
1✔
56
TIMEOUT = ContributionsGuard.STAMP_TTL
1✔
57

58

59
def new_task(project_id, sched, user_id=None, user_ip=None,
1✔
60
             external_uid=None, offset=0, limit=1, orderby='priority_0',
61
             desc=True, rand_within_priority=False,
62
             gold_only=False, task_id=None, saved_task_position=None):
63
    """Get a new task by calling the appropriate scheduler function."""
64
    sched_map = {
1✔
65
        'default': get_locked_task,
66
        Schedulers.locked: get_locked_task,
67
        Schedulers.user_pref: get_user_pref_task,
68
        Schedulers.task_queue: get_user_pref_task
69
    }
70
    scheduler = sched_map.get(sched, sched_map['default'])
1✔
71
    project = project_repo.get(project_id)
1✔
72
    disable_gold = not project.info.get('enable_gold', True)
1✔
73

74
    task_type = 'gold_last'
1✔
75
    if gold_only:
1✔
76
        task_type = 'gold'
1✔
77
    elif disable_gold:
1✔
78
        # This is here for testing. It removes the random variable to make testing deterministic.
79
        task_type = 'no_gold'
1✔
80
    elif random.random() < project.get_gold_task_probability():
1✔
81
        task_type = 'gold_first'
1✔
82

83
    return scheduler(project_id,
1✔
84
                     user_id,
85
                     user_ip,
86
                     external_uid,
87
                     offset=offset,
88
                     limit=limit,
89
                     orderby=orderby,
90
                     desc=desc,
91
                     rand_within_priority=rand_within_priority,
92
                     filter_user_prefs=(sched in [Schedulers.user_pref, Schedulers.task_queue]),
93
                     task_type=task_type,
94
                     task_id=task_id if sched in [Schedulers.task_queue] else None,
95
                     saved_task_position=saved_task_position)
96

97

98
def is_locking_scheduler(sched):
1✔
99
    return sched in [Schedulers.locked, Schedulers.user_pref, Schedulers.task_queue, 'default']
1✔
100

101

102
def can_read_task(task, user):
1✔
103
    project_id = task.project_id
1✔
104
    scheduler, timeout = get_project_scheduler_and_timeout(project_id)
1✔
105
    if is_locking_scheduler(scheduler):
1✔
106
        return has_read_access(user) or has_lock(task.id, user.id,
1✔
107
                                                 timeout)
108
    else:
109
        return True
×
110

111

112
def can_post(project_id, task_id, user_id_or_ip):
1✔
113
    scheduler = get_project_scheduler(project_id, session)
1✔
114
    if is_locking_scheduler(scheduler):
1✔
115
        user_id = user_id_or_ip['user_id'] or \
1✔
116
                user_id_or_ip['external_uid'] or \
117
                user_id_or_ip['user_ip'] or \
118
                '127.0.0.1'
119
        allowed = has_lock(task_id, user_id, TIMEOUT)
1✔
120
        return allowed
1✔
121
    else:
UNCOV
122
        return True
×
123

124

125
def after_save(task_run, conn):
1✔
126
    scheduler = get_project_scheduler(task_run.project_id, conn)
1✔
127
    uid = task_run.user_id or \
1✔
128
          task_run.external_uid or \
129
          task_run.user_ip or \
130
          '127.0.0.1'
131
    if is_locking_scheduler(scheduler):
1✔
132
        release_lock(task_run.task_id, uid, TIMEOUT)
1✔
133
        release_reserve_task_lock_by_id(task_run.project_id, task_run.task_id, uid, TIMEOUT)
1✔
134

135

136
def locked_scheduler(query_factory):
1✔
137
    @wraps(query_factory)
1✔
138
    def template_get_locked_task(project_id, user_id=None, user_ip=None,
1✔
139
                                 external_uid=None, limit=1, offset=0,
140
                                 orderby='priority_0', desc=True,
141
                                 rand_within_priority=False, task_type='gold_last',
142
                                 filter_user_prefs=False,
143
                                 task_category_filters="",
144
                                 task_id=None,
145
                                 saved_task_position=None):
146
        if task_id:
1✔
147
            task = session.query(Task).get(task_id)
1✔
148
            if task:
1✔
149
                return [task]
1✔
150

151
        if offset > 2:
1✔
152
            raise BadRequest('')
1✔
153
        if offset > 0:
1✔
154
            return None
1✔
155
        project = project_repo.get(project_id)
1✔
156
        timeout = project.info.get('timeout', TIMEOUT)
1✔
157
        task_queue_scheduler = project.info.get("sched", "default") in [Schedulers.task_queue]
1✔
158
        reserve_task_config = project.info.get("reserve_tasks", {}).get("category", [])
1✔
159

160
        # "first" or "last" value of saved_task_position will result tasks retrieving from DB
161
        task_id, lock_seconds = (None, 0) if saved_task_position else get_task_id_and_duration_for_project_user(project_id, user_id)
1✔
162

163
        if lock_seconds > 10:
1✔
164
            task = session.query(Task).get(task_id)
1✔
165
            if task:
1✔
166
                return [task]
1✔
167
        user_count = get_active_user_count(project_id, sentinel.master)
1✔
168
        assign_user = json.dumps({'assign_user': [cached_users.get_user_email(user_id)]}) if user_id else None
1✔
169
        current_app.logger.info(
1✔
170
            "Project {} - number of current users: {}"
171
            .format(project_id, user_count))
172

173
        sql_filters, exclude_user = "", False
1✔
174
        if task_queue_scheduler and reserve_task_config:
1✔
175
            sql_filters, category_keys = get_reserve_task_category_info(reserve_task_config, project_id, timeout, user_id)
1✔
176
            if not category_keys:
1✔
177
                # no category reserved by current user. search categories
178
                # excluding the ones reserved by other users
179
                current_app.logger.info(
1✔
180
                    "Project %s, user %s, %s", project_id, user_id,
181
                    "No task category reserved by user. Search tasks excuding categories reserved by other users"
182
                )
183
                exclude_user = True
1✔
184
                sql_filters, category_keys = get_reserve_task_category_info(
1✔
185
                    reserve_task_config, project_id, timeout, user_id, exclude_user
186
                )
187
                current_app.logger.info("SQL filter excuding task categories reserved by other users. sql filter %s", sql_filters)
1✔
188

189
        limit = current_app.config.get('DB_MAXIMUM_BATCH_SIZE') if filter_user_prefs else user_count + 5 + current_app.config.get('MAX_SAVED_ANSWERS', 30)
1✔
190
        sql = query_factory(project_id, user_id=user_id, limit=limit,
1✔
191
                            rand_within_priority=rand_within_priority,
192
                            task_type=task_type, task_category_filters=sql_filters)
193
        rows = session.execute(sql, dict(project_id=project_id,
1✔
194
                                         user_id=user_id,
195
                                         assign_user=assign_user,
196
                                         limit=limit))
197

198
        if task_queue_scheduler and reserve_task_config and rows and not rows.rowcount and not exclude_user:
1✔
199
            # With task category reserved by user and no records returned,
200
            # no ongoing tasks with task category reserved by user exist.
201
            # Hence, query db for tasks excluding task categories reserved
202
            # by other users passing exclude_users = True
203
            current_app.logger.info(
1✔
204
                "Project %s, user %s, %s", project_id, user_id,
205
                "No task exist with task category already reserved by user. Search tasks excuding categories reserved by other users"
206
            )
207
            exclude_user = True
1✔
208
            release_reserve_task_lock_by_keys(category_keys, timeout)
1✔
209
            sql_filters, category_keys = get_reserve_task_category_info(
1✔
210
                reserve_task_config, project_id, timeout, user_id, exclude_user
211
            )
212
            current_app.logger.info("SQL filter excuding task categories reserved by other users. sql filter %s", sql_filters)
1✔
213
            sql = query_factory(project_id, user_id=user_id, limit=limit,
1✔
214
                            rand_within_priority=rand_within_priority,
215
                            task_type=task_type, task_category_filters=sql_filters)
216
            rows = session.execute(sql, dict(project_id=project_id,
1✔
217
                                            user_id=user_id,
218
                                            assign_user=assign_user,
219
                                            limit=limit))
220

221
        user_profile = cached_users.get_user_profile_metadata(user_id)
1✔
222

223
        # Get all saved task IDs from Redis for the current user
224
        task_id_map = None
1✔
225
        if saved_task_position:
1✔
226
            task_id_map = get_user_saved_partial_tasks(sentinel, project_id, user_id)
1✔
227

228
        # validate user qualification and calculate task preference score
229
        user_profile = json.loads(user_profile) if user_profile else {}
1✔
230
        task_rank_info = []
1✔
231
        for task_id, taskcount, n_answers, calibration, w_filter, w_pref, timeout in rows:
1✔
232
            score = 0
1✔
233
            # Check the dictionary task_id_map for the saved task and set the score for sorting
234
            if task_id_map:
1✔
235
                ttl = task_id_map.get(task_id, -1)
1✔
236
                if ttl > 0 and saved_task_position == SavedTaskPositionEnum.LAST:
1✔
237
                    score = -ttl  # Saved tasks sink to the bottom, but with earliest saved task first
1✔
238
                elif ttl > 0 and saved_task_position == SavedTaskPositionEnum.FIRST:
1✔
239
                    score = sys.maxsize - ttl  # Earliest saved task first
1✔
240
                task_rank_info.append((task_id, taskcount, n_answers, calibration, score, None, timeout))
1✔
241
            elif filter_user_prefs:  # Only include when filter requirement is met
1✔
242
                w_pref = w_pref or {}
1✔
243
                w_filter = w_filter or {}
1✔
244
                meet_requirement = cached_task_browse_helpers.user_meet_task_requirement(task_id, w_filter, user_profile)
1✔
245
                if meet_requirement:
1✔
246
                    score = cached_task_browse_helpers.get_task_preference_score(w_pref, user_profile)
1✔
247
                    task_rank_info.append((task_id, taskcount, n_answers, calibration, score, None, timeout))
1✔
248
            else:  # Default/locker schedulers
249
                task_rank_info.append((task_id, taskcount, n_answers, calibration, score, None, timeout))
1✔
250
        rows = sorted(task_rank_info, key=lambda tup: tup[4], reverse=True)
1✔
251

252
        # Iterate a list of tasks but only lock one task and return the locked task
253
        for task_id, taskcount, n_answers, calibration, _, _, timeout in rows:
1✔
254
            timeout = timeout or TIMEOUT
1✔
255
            remaining = float('inf') if calibration else n_answers - taskcount
1✔
256
            if acquire_locks(task_id, user_id, remaining, timeout):
1✔
257
                # reserve tasks
258
                acquire_reserve_task_lock(project_id, task_id, user_id, timeout)
1✔
259
                return _lock_task_for_user(task_id, project_id, user_id, timeout, calibration)
1✔
260
        return []
1✔
261

262
    return template_get_locked_task
1✔
263

264

265
def reserve_task_sql_filters(project_id, reserve_task_keys, exclude):
1✔
266
    # build sql query filter from task category cache key
267
    # return sql filter for matching task category keys and list of
268
    # task category keys that qualifies for a given project_id
269

270
    filters, category_keys = "", []
1✔
271

272
    if not (project_id and len(reserve_task_keys)):
1✔
273
        return filters, category_keys
1✔
274

275
    # convert task category redis cache key to sql query
276
    # eg "co_name:IBM:ticker:IBM_US" would be converted to
277
    # "task.info->>'co_name' = 'IBM' AND task.info->>'ticker' = 'IBM_US"
278
    filter_dict = {}
1✔
279
    current_app.logger.info("Project %s, exclude %s. Build sql filter from reserver task keys", project_id, exclude)
1✔
280
    current_app.logger.info("reserve tasks keys: %s", json.dumps(reserve_task_keys))
1✔
281
    regex_key = "reserve_task:project:{}:category:(.+?):user".format(project_id)
1✔
282

283
    for item in reserve_task_keys:
1✔
284
        data = re.search(regex_key, item)
1✔
285
        if not data:
1✔
286
            continue
1✔
287

288
        category_keys += [item]
1✔
289
        category = data.group(1)
1✔
290

291
        if category in filter_dict:
1✔
292
            continue
×
293

294
        category_fv = category.split(":")
1✔
295
        filter_list = []
1✔
296
        for i in range(0, len(category_fv), 2):
1✔
297
            key, value = category_fv[i], category_fv[i + 1]
1✔
298
            escaped_value = value.replace("'", "''")
1✔
299
            filter_list.append("task.info->>'{}' = '{}'".format(key, escaped_value))
1✔
300
        filter_dict[category] = "({})".format(" AND ".join(filter_list))
1✔
301

302
    if filter_dict:
1✔
303
        exclude_clause = "IS NOT TRUE " if exclude else ""
1✔
304
        filters = "({}) {}".format(" OR ".join(filter_dict.values()), exclude_clause)
1✔
305
        filters = " AND {}".format(filters) if filters else filters
1✔
306

307
    current_app.logger.info("sql filter %s, reserve keys %s", filters, json.dumps(category_keys))
1✔
308

309
    # TODO: pull task # from category keys, look for values from task._add_user_info
310
    # generate sql_filter considering value field type instead.
311
    return filters, category_keys
1✔
312

313

314
def get_reserve_task_key(task_id):
1✔
315
    reserve_key = ""
1✔
316
    task = task_repo.get_task(task_id)
1✔
317
    if not task:
1✔
318
        return reserve_key
×
319

320
    project = project_repo.get(task.project_id)
1✔
321
    if not (project and project.info.get("sched", "default") in [Schedulers.task_queue]):
1✔
322
        return reserve_key
1✔
323

324
    reserve_task_config = project.info.get("reserve_tasks", {}).get("category", [])
1✔
325
    if not reserve_task_config:
1✔
326
        return reserve_key
×
327

328
    if not all(field in task.info for field in reserve_task_config):
1✔
329
        return reserve_key
×
330

331
    reserve_key = ":".join(["{}:{}".format(field, task.info[field]) for field in sorted(reserve_task_config)])
1✔
332
    return reserve_key
1✔
333

334

335
def get_reserve_task_category_info(reserve_task_config, project_id, timeout, user_id, exclude_user=False):
1✔
336
    """Get reserved category info for a given user under a given project"""
337
    timeout = timeout or TIMEOUT
1✔
338
    sql_filters, category_keys = "", []
1✔
339

340
    if not reserve_task_config:
1✔
341
        return sql_filters, category_keys
1✔
342

343
    if current_app.config.get('PRIVATE_INSTANCE'):
1✔
344
        current_app.logger.info("Reserve task by category disabled for private instance. project_id %s, reserve_task_config %s",
1✔
345
            project_id, str(reserve_task_config))
346
        return sql_filters, category_keys
1✔
347

348
    category = ":".join(["{}:*".format(field) for field in sorted(reserve_task_config)])
1✔
349
    lock_manager = LockManager(sentinel.master, timeout)
1✔
350
    category_keys = lock_manager.get_task_category_lock(project_id, user_id, category, exclude_user)
1✔
351
    current_app.logger.info(
1✔
352
        "Project %s, user %s, reserve config %s, exclude %s. reserve task category keys %s",
353
        project_id, user_id, json.dumps(reserve_task_config), exclude_user, str(category_keys)
354
    )
355
    if not category_keys:
1✔
356
        return sql_filters, category_keys
1✔
357

358
    sql_filters, category_keys = reserve_task_sql_filters(project_id, category_keys, exclude_user)
1✔
359
    return sql_filters, category_keys
1✔
360

361

362
def locked_task_sql(project_id, user_id=None, limit=1, rand_within_priority=False,
1✔
363
                    task_type='gold_last', filter_user_prefs=False,
364
                    priority_sort=True, task_category_filters=""):
365
    '''
366
    `task_type` will affect the type of tasks return by the query and can be one
367
    one of the following values:
368
        gold ->         only gold tasks will be returned
369
        no_gold ->      only non-gold tasks will be returned
370
        gold_last ->    non-gold tasks will be returned before gold tasks. (Default)
371
        gold_first ->   gold tasks will be returned before non-gold tasks.
372
    '''
373
    filters = []
1✔
374
    if filter_user_prefs:
1✔
375
        filters.append('AND ({}) AND ({})'.format(cached_users.get_user_preferences(user_id), cached_users.get_user_filters(user_id)))
1✔
376
    if task_type == 'gold':
1✔
377
        filters.append('AND task.calibration = 1')
1✔
378
    elif task_type == 'no_gold':
1✔
379
        filters.append('AND task.calibration != 1')
1✔
380

381
    order_by = []
1✔
382
    if task_type == 'gold_last':
1✔
383
        order_by.append('task.calibration')
1✔
384
    elif task_type == 'gold_first':
1✔
385
        order_by.append('task.calibration DESC NULLS LAST')
1✔
386
    if priority_sort:
1✔
387
        order_by.append('priority_0 DESC')
1✔
388
    if rand_within_priority:
1✔
389
        order_by.append('random()')
1✔
390
    else:
391
        order_by.append('id ASC')
1✔
392

393
    sql = '''
1✔
394
           SELECT task.id, COUNT(task_run.task_id) AS taskcount, n_answers, task.calibration,
395
           worker_filter, worker_pref,
396
              (SELECT info->'timeout'
397
               FROM project
398
               WHERE id=:project_id) as timeout
399
           FROM task
400
           LEFT JOIN task_run ON (task.id = task_run.task_id)
401
           WHERE NOT EXISTS
402
           (SELECT 1 FROM task_run WHERE project_id=:project_id AND
403
           user_id=:user_id AND task_id=task.id)
404
           AND task.project_id=:project_id
405
           AND ((task.expiration IS NULL) OR (task.expiration > (now() at time zone 'utc')::timestamp))
406
           AND task.state !='completed'
407
           AND task.state !='enrich'
408
           {}
409
           {}
410
           group by task.id
411
           ORDER BY {}
412
           LIMIT :limit;
413
           '''.format(' '.join(filters), task_category_filters,
414
                      ','.join(order_by))
415
    return text(sql)
1✔
416

417

418
def select_contributable_task(project, user_id, **kwargs):
1✔
419
    sched, _ = get_scheduler_and_timeout(project)
1✔
420
    with_user_pref = sched in [Schedulers.user_pref, Schedulers.task_queue]
1✔
421
    kwargs['filter_user_prefs'] = with_user_pref
1✔
422

423
    params = dict(project_id=project.id, user_id=user_id, limit=1)
1✔
424
    if with_user_pref:
1✔
425
        params['assign_user'] = None
1✔
426

427
    sql = locked_task_sql(project.id, user_id, **kwargs)
1✔
428
    rows = session.execute(sql, params)
1✔
429
    for row in rows:
1✔
430
        return task_repo.get_task(row.id)
1✔
431
    return {}
1✔
432

433

434
def select_task_for_gold_mode(project, user_id):
1✔
435
    return select_contributable_task(project, user_id,
1✔
436
        rand_within_priority=True, task_type='no_gold', priority_sort=False)
437

438

439
@locked_scheduler
1✔
440
def get_locked_task(project_id, user_id=None, limit=1, rand_within_priority=False,
1✔
441
                    task_type='gold_last', task_category_filters="", saved_task_position=None):
442
    return locked_task_sql(project_id, user_id=user_id, limit=limit,
1✔
443
                           rand_within_priority=rand_within_priority, task_type=task_type,
444
                           filter_user_prefs=False, task_category_filters=task_category_filters)
445

446

447
@locked_scheduler
1✔
448
def get_user_pref_task(project_id, user_id=None, limit=1, rand_within_priority=False,
1✔
449
                       task_type='gold_last', filter_user_prefs=True, task_category_filters="", saved_task_position=None):
450
    """ Select a new task based on user preference set under user profile.
451

452
    For each incomplete task, check if the number of users working on the task
453
    is smaller than the number of answers still needed. In that case, acquire
454
    a lock on the task that matches user preference(if any) with users profile
455
    and return the task to the user. If offset is nonzero, skip that amount of
456
    available tasks before returning to the user.
457
    """
458
    return locked_task_sql(project_id, user_id=user_id, limit=limit,
1✔
459
                           rand_within_priority=rand_within_priority, task_type=task_type,
460
                           filter_user_prefs=True, task_category_filters=task_category_filters)
461

462

463
def fetch_lock_for_user(project_id, task_id, user_id):
1✔
464
    scheduler, timeout = get_project_scheduler_and_timeout(project_id)
1✔
465

466
    ttl = None
1✔
467
    if is_locking_scheduler(scheduler):
1✔
468
        task_locked_by_user = has_lock(task_id, user_id, timeout)
1✔
469
        if task_locked_by_user:
1✔
470
            locks = get_locks(task_id, timeout)
1✔
471
            ttl = locks.get(str(user_id))
1✔
472

473
    return timeout, ttl
1✔
474

475

476
def has_lock(task_id, user_id, timeout):
1✔
477
    lock_manager = LockManager(sentinel.master, timeout)
1✔
478
    task_users_key = get_task_users_key(task_id)
1✔
479
    return lock_manager.has_lock(task_users_key, user_id)
1✔
480

481

482
def acquire_locks(task_id, user_id, limit, timeout):
1✔
483
    lock_manager = LockManager(sentinel.master, timeout)
1✔
484
    task_users_key = get_task_users_key(task_id)
1✔
485
    user_tasks_key = get_user_tasks_key(user_id)
1✔
486
    if lock_manager.acquire_lock(task_users_key, user_id, limit):
1✔
487
        lock_manager.acquire_lock(user_tasks_key, task_id, float('inf'))
1✔
488
        return True
1✔
489
    return False
1✔
490

491

492
def release_reserve_task_lock_by_id(project_id, task_id, user_id, timeout, expiry=EXPIRE_RESERVE_TASK_LOCK_DELAY, release_all_task=False):
1✔
493
    reserve_key = get_reserve_task_key(task_id)
1✔
494
    if not reserve_key:
1✔
495
        return
1✔
496

497
    redis_conn = sentinel.master
1✔
498
    lock_manager = LockManager(redis_conn, timeout)
1✔
499
    if release_all_task:
1✔
500
        pattern = "reserve_task:project:{}:category:{}:user:{}:task:*".format(
1✔
501
            project_id, reserve_key, user_id)
502
        resource_ids = lock_manager.scan_keys(pattern)
1✔
503

504
        # get_user_tasks contains task_id and time_stamp pair. Filter out non expired tasks
505
        tasks_locked_by_user = {task_id: time_stamp for task_id, time_stamp
1✔
506
                                in get_user_tasks(user_id, timeout).items()
507
                                if LockManager.seconds_remaining(time_stamp) > EXPIRE_LOCK_DELAY}
508

509
        for k in resource_ids:
1✔
510
            task_id_in_key = int(k.decode().split(":")[-1])
1✔
511
            # If a task is locked by the user(in other tab), then the category lock should not be released
512
            if task_id_in_key == task_id or str(task_id_in_key) not in tasks_locked_by_user:
1✔
513
                lock_manager.release_reserve_task_lock(k, expiry)
1✔
514
                current_app.logger.info("Release reserve task locks: %s, task: %d, project: %s, user: %s", k, task_id_in_key, project_id, user_id)
1✔
515
    else:
516
        resource_id = "reserve_task:project:{}:category:{}:user:{}:task:{}".format(
1✔
517
            project_id, reserve_key, user_id, task_id)
518
        lock_manager.release_reserve_task_lock(resource_id, expiry)
1✔
519
        current_app.logger.info(
1✔
520
            "Release reserve task lock. project %s, task %s, user %s, expiry %d",
521
            project_id, task_id, user_id, expiry
522
        )
523

524

525
def release_reserve_task_lock_by_keys(resource_ids, timeout, pipeline=None, expiry=EXPIRE_RESERVE_TASK_LOCK_DELAY):
1✔
526
    if not resource_ids:
1✔
527
        return
×
528

529
    redis_conn = sentinel.master
1✔
530
    lock_manager = LockManager(redis_conn, timeout)
1✔
531
    for resource_id in resource_ids:
1✔
532
        lock_manager.release_reserve_task_lock(resource_id, expiry)
1✔
533
        current_app.logger.info(
1✔
534
        "Release reserve task lock. resource id %s, expiry %d", resource_id, expiry)
535

536

537
def acquire_reserve_task_lock(project_id, task_id, user_id, timeout, pipeline=None, execute=True):
1✔
538
    task = task_repo.get_task(task_id)
1✔
539
    project = project_repo.get(project_id)
1✔
540
    if not (task and project and project.info.get("sched", "default") in [Schedulers.task_queue]):
1✔
541
        return False
1✔
542

543
    reserve_task_config = project.info.get("reserve_tasks", {}).get("category", [])
1✔
544
    category_exist = reserve_task_config and all(task.info.get(field, False) for field in reserve_task_config)
1✔
545
    if not category_exist:
1✔
546
        return False
1✔
547

548
    category = ["{}:{}".format(field, task.info.get(field)) for field in reserve_task_config]
1✔
549
    category = ":".join(category)
1✔
550
    redis_conn = sentinel.master
1✔
551
    pipeline = pipeline or redis_conn.pipeline(transaction=True)
1✔
552
    lock_manager = LockManager(redis_conn, timeout)
1✔
553
    if lock_manager.acquire_reserve_task_lock(project_id, task_id, user_id, category):
1✔
554
        current_app.logger.info(
1✔
555
            "Acquired reserve task lock. project %s, task %s, user %s, category %s",
556
            project_id, task_id, user_id, category
557
        )
558
        return True
1✔
559
    return False
×
560

561

562
def lock_task_for_user(task_id, project_id, user_id):
1✔
563
    sql = '''
×
564
        SELECT task.id, COUNT(task_run.task_id) AS taskcount, n_answers, task.calibration,
565
            (SELECT info->'timeout'
566
            FROM project
567
            WHERE id=:project_id) as timeout
568
        FROM task
569
        LEFT JOIN task_run ON (task.id = task_run.task_id)
570
        WHERE NOT EXISTS
571
        (SELECT 1 FROM task_run WHERE project_id=:project_id AND
572
        user_id=:user_id AND task_id=task.id)
573
        AND task.project_id=:project_id
574
        AND task.id = :task_id
575
        AND ((task.expiration IS NULL) OR (task.expiration > (now() at time zone 'utc')::timestamp))
576
        AND task.state !='completed'
577
        AND task.state !='enrich'
578
        group by task.id
579
        '''
580

581
    rows = session.execute(sql, dict(project_id=project_id,
×
582
                                    user_id=user_id,
583
                                    task_id=task_id))
584
    for task_id, taskcount, n_answers, calibration, timeout in rows:
×
585
        timeout = timeout or TIMEOUT
×
586
        remaining = float('inf') if calibration else n_answers - taskcount
×
587
        if acquire_locks(task_id, user_id, remaining, timeout):
×
588
            # reserve tasks
589
            acquire_reserve_task_lock(project_id, task_id, user_id, timeout)
×
590
            return _lock_task_for_user(task_id, project_id, user_id, timeout, calibration)
×
591

592

593
def _lock_task_for_user(task_id, project_id, user_id, timeout, calibration=False):
1✔
594
    save_task_id_project_id(task_id, project_id, 2 * timeout)
1✔
595
    register_active_user(project_id, user_id, sentinel.master, ttl=timeout)
1✔
596

597
    task_type = 'gold task' if calibration else 'task'
1✔
598
    current_app.logger.info(
1✔
599
        'Project {} - user {} obtained {} {}, timeout: {}'
600
        .format(project_id, user_id, task_type, task_id, timeout))
601
    return [session.query(Task).get(task_id)]
1✔
602

603

604
def release_user_locks_for_project(user_id, project_id):
1✔
605
    user_tasks = get_user_tasks(user_id, TIMEOUT)
1✔
606
    user_task_ids = list(user_tasks.keys())
1✔
607
    project_ids = get_task_ids_project_id(user_task_ids)
1✔
608
    task_ids = []
1✔
609
    for task_id, task_project_id in zip(user_task_ids, project_ids):
1✔
610
        if not task_project_id:
1✔
611
            task_project_id = task_repo.get_task(task_id).project_id
×
612
        if int(task_project_id) == project_id:
1✔
613
            release_lock(task_id, user_id, TIMEOUT)
1✔
614
            task_ids.append(task_id)
1✔
615
    current_app.logger.info('released user id {} locks on tasks {}'.format(user_id, task_ids))
1✔
616
    return task_ids
1✔
617

618

619
def release_lock(task_id, user_id, timeout, pipeline=None, execute=True):
1✔
620
    redis_conn = sentinel.master
1✔
621
    pipeline = pipeline or redis_conn.pipeline(transaction=True)
1✔
622
    lock_manager = LockManager(redis_conn, timeout)
1✔
623
    task_users_key = get_task_users_key(task_id)
1✔
624
    user_tasks_key = get_user_tasks_key(user_id)
1✔
625
    lock_manager.release_lock(task_users_key, user_id, pipeline=pipeline)
1✔
626
    lock_manager.release_lock(user_tasks_key, task_id, pipeline=pipeline)
1✔
627

628
    project_ids = get_task_ids_project_id([task_id])
1✔
629
    remaining_user_tasks_id = [t for t in get_user_tasks(user_id, timeout).keys() if t != str(task_id)]
1✔
630
    if project_ids:
1✔
631
        if project_ids[0] not in get_task_ids_project_id(remaining_user_tasks_id):
1✔
632
            unregister_active_user(project_ids[0], user_id, sentinel.master)
1✔
633

634
    if execute:
1✔
635
        pipeline.execute()
1✔
636

637

638
def get_locks(task_id, timeout):
1✔
639
    lock_manager = LockManager(sentinel.master, timeout)
1✔
640
    task_users_key = get_task_users_key(task_id)
1✔
641
    return lock_manager.get_locks(task_users_key)
1✔
642

643

644
def get_user_tasks(user_id, timeout):
1✔
645
    lock_manager = LockManager(sentinel.master, timeout)
1✔
646
    user_tasks_key = get_user_tasks_key(user_id)
1✔
647
    return lock_manager.get_locks(user_tasks_key)
1✔
648

649

650
def save_task_id_project_id(task_id, project_id, timeout):
1✔
651
    task_id_project_id_key = get_task_id_project_id_key(task_id)
1✔
652
    sentinel.master.setex(task_id_project_id_key, timeout, project_id)
1✔
653

654

655
def get_task_ids_project_id(task_ids):
1✔
656
    keys = [get_task_id_project_id_key(t) for t in task_ids]
1✔
657
    if keys:
1✔
658
        return sentinel.master.mget(keys)
1✔
659
    return []
1✔
660

661

662
def get_task_id_and_duration_for_project_user(project_id, user_id):
1✔
663
    """Returns the max seconds remaining locked task for a user and project."""
664
    user_tasks = get_user_tasks(user_id, TIMEOUT)
1✔
665
    user_task_ids = list(user_tasks.keys())
1✔
666
    project_ids = get_task_ids_project_id(user_task_ids)
1✔
667
    max_seconds_task_id = -1
1✔
668
    max_seconds_remaining = float('-inf')
1✔
669
    for task_id, task_project_id in zip(user_task_ids, project_ids):
1✔
670
        if not task_project_id:
1✔
671
            task = task_repo.get_task(task_id)
1✔
672
            if task:
1✔
673
                task_project_id = task.project_id
1✔
674
                save_task_id_project_id(task_id, task_project_id, 2 * TIMEOUT)
1✔
675
            else:
676
                # No task found for task_id.
677
                current_app.logger.info(
1✔
678
                    "Project {}, User {}, Task Id {} - task not found in get_task_id_and_duration_for_project_user()."
679
                    .format(project_id, user_id, task_id))
680

681
        if task_project_id and int(task_project_id) == project_id:
1✔
682
            seconds_remaining = LockManager.seconds_remaining(user_tasks[task_id])
1✔
683
            if seconds_remaining > max_seconds_remaining:
1✔
684
                max_seconds_task_id = int(task_id)
1✔
685
                max_seconds_remaining = seconds_remaining
1✔
686
    if max_seconds_task_id > 0:
1✔
687
        return max_seconds_task_id, max_seconds_remaining
1✔
688
    return None, -1
1✔
689

690

691
def release_user_locks(user_id):
1✔
692
    redis_conn = sentinel.master
1✔
693
    pipeline = redis_conn.pipeline(transaction=True)
1✔
694
    for key in get_user_tasks(user_id, TIMEOUT).keys():
1✔
695
        release_lock(key, user_id, TIMEOUT, pipeline=pipeline, execute=False)
1✔
696
    pipeline.execute()
1✔
697

698

699
def get_project_scheduler_and_timeout(project_id):
1✔
700
    project = project_repo.get(project_id)
1✔
701
    if not project:
1✔
702
        raise Forbidden('Invalid project_id')
×
703
    return get_scheduler_and_timeout(project)
1✔
704

705

706
def get_scheduler_and_timeout(project):
1✔
707
    scheduler = project.info.get('sched', 'default')
1✔
708
    timeout = project.info.get('timeout', TIMEOUT)
1✔
709
    if scheduler == 'default':
1✔
710
        scheduler = DEFAULT_SCHEDULER
1✔
711
    return scheduler, timeout
1✔
712

713

714
def has_read_access(user):
1✔
715
    return not user.is_anonymous and (user.admin or user.subadmin)
1✔
716

717

718
def get_project_scheduler(project_id, conn):
1✔
719
    sql = text('''
1✔
720
        SELECT info->>'sched' as sched FROM project WHERE id=:project_id;
721
        ''')
722
    row = conn.execute(sql, dict(project_id=project_id)).first()
1✔
723
    if not row:
1✔
724
        return 'default'
×
725
    return row.sched or 'default'
1✔
726

727

728
def sched_variants():
1✔
729
    return [('default', 'Default'),
1✔
730
            (Schedulers.locked, 'Locked'),
731
            (Schedulers.user_pref, 'User Preference Scheduler'),
732
            (Schedulers.task_queue, 'Task Queues')
733
            ]
734

735

736
def randomizable_scheds():
1✔
737
    scheds = [Schedulers.locked, Schedulers.user_pref]
1✔
738
    if DEFAULT_SCHEDULER in scheds:
1✔
739
        scheds.append('default')
1✔
740
    return scheds
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc