• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

bloomberg / pybossa / 19114618549

05 Nov 2025 07:59PM UTC coverage: 94.093% (+0.03%) from 94.065%
19114618549

Pull #1069

github

peterkle
add more tests
Pull Request #1069: RDISCROWD-8392 upgrade to boto3

214 of 223 new or added lines in 7 files covered. (95.96%)

152 existing lines in 8 files now uncovered.

17920 of 19045 relevant lines covered (94.09%)

0.94 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.59
/pybossa/redis_lock.py
1
# -*- coding: utf8 -*-
2
# This file is part of PYBOSSA.
3
#
4
# Copyright (C) 2018 Scifabric LTD.
5
#
6
# PYBOSSA is free software: you can redistribute it and/or modify
7
# it under the terms of the GNU Affero General Public License as published by
8
# the Free Software Foundation, either version 3 of the License, or
9
# (at your option) any later version.
10
#
11
# PYBOSSA is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
# GNU Affero General Public License for more details.
15
#
16
# You should have received a copy of the GNU Affero General Public License
17
# along with PYBOSSA.  If not, see <http://www.gnu.org/licenses/>.
18

19
import json
1✔
20
from datetime import timedelta
1✔
21
from time import time
1✔
22

23
from pybossa.contributions_guard import ContributionsGuard
1✔
24
from pybossa.core import sentinel
1✔
25
from werkzeug.exceptions import BadRequest
1✔
26

27
TASK_USERS_KEY_PREFIX = 'pybossa:project:task_requested:timestamps:{0}'
1✔
28
USER_TASKS_KEY_PREFIX = 'pybossa:user:task_acquired:timestamps:{0}'
1✔
29
TASK_ID_PROJECT_ID_KEY_PREFIX = 'pybossa:task_id:project_id:{0}'
1✔
30
ACTIVE_USER_KEY = 'pybossa:active_users_in_project:{}'
1✔
31
EXPIRE_LOCK_DELAY = 5
1✔
32
EXPIRE_RESERVE_TASK_LOCK_DELAY = 30*60
1✔
33

34

35
def get_active_user_key(project_id):
1✔
36
    return ACTIVE_USER_KEY.format(project_id)
1✔
37

38
def get_task_users_key(task_id):
1✔
39
    if type(task_id) == bytes:
1✔
UNCOV
40
        task_id = task_id.decode()
×
41
    return TASK_USERS_KEY_PREFIX.format(task_id)
1✔
42

43
def get_user_tasks_key(user_id):
1✔
44
    # bytes to unicode string
45
    if type(user_id) == bytes:
1✔
UNCOV
46
        user_id = user_id.decode()
×
47
    return USER_TASKS_KEY_PREFIX.format(user_id)
1✔
48

49
def get_task_id_project_id_key(task_id):
1✔
50
    return TASK_ID_PROJECT_ID_KEY_PREFIX.format(task_id)
1✔
51

52
def get_active_user_count(project_id, conn):
1✔
53
    now = time()
1✔
54
    key = get_active_user_key(project_id)
1✔
55
    to_delete = [user for user, expiration in conn.hgetall(key).items()
1✔
56
                 if float(expiration) < now]
57
    if to_delete:
1✔
58
        conn.hdel(key, *to_delete)
1✔
59
    return conn.hlen(key)
1✔
60

61

62
def register_active_user(project_id, user_id, conn, ttl=2*60*60):
1✔
63
    now = time()
1✔
64
    key = get_active_user_key(project_id)
1✔
65
    conn.hset(key, user_id, now + ttl)
1✔
66
    conn.expire(key, ttl)
1✔
67

68

69
def unregister_active_user(project_id, user_id, conn):
1✔
70
    now = time()
1✔
71
    key = get_active_user_key(project_id)
1✔
72
    conn.hset(key, user_id, now + EXPIRE_LOCK_DELAY)
1✔
73

74

75
def get_locked_tasks_project(project_id):
1✔
76
    """Returns a list of locked tasks for a given project."""
77
    tasks = []
1✔
78
    redis_conn = sentinel.master
1✔
79
    timeout = ContributionsGuard.STAMP_TTL
1✔
80
    lock_manager = LockManager(sentinel.master, timeout)
1✔
81

82
    # Get the active users key for this project.
83
    key = get_active_user_key(project_id)
1✔
84

85
    # Get the users for each locked task.
86
    for user_key in redis_conn.hgetall(key).items():
1✔
87
        user_id = user_key[0]
1✔
88

89
        # Redis client in Python returns bytes string
90
        if type(user_id) == bytes:
1✔
91
            user_id = user_id.decode()
1✔
92

93
        # Get locks by user.
94
        user_tasks_key = get_user_tasks_key(user_id)
1✔
95
        user_tasks = lock_manager.get_locks(user_tasks_key)
1✔
96
        # Get task ids for the locks.
97
        user_task_ids = user_tasks.keys()
1✔
98
        # Get project ids for the task ids.
99
        results = []
1✔
100
        keys = [get_task_id_project_id_key(t) for t in user_task_ids]
1✔
101
        if keys:
1✔
102
            results = sentinel.master.mget(keys)
1✔
103

104
        # For each locked task, check if the lock is still active.
105
        for task_id, task_project_id in zip(user_task_ids, results):
1✔
106
            if not task_project_id:
1✔
107
                # Import at runtime due to order of execution for global initialization of task_repo.
108
                from pybossa.core import task_repo
1✔
109
                task = task_repo.get_task(task_id)
1✔
110
                if task:
1✔
UNCOV
111
                    task_project_id = task.project_id
×
112
                else:
113
                    # Locked task has been deleted.
114
                    task_users_key = get_task_users_key(task_id)
1✔
115
                    lock_manager.release_lock(task_users_key, user_id)
1✔
116
                    lock_manager.release_lock(user_tasks_key, task_id)
1✔
117
            # Match the requested project id.
118
            if task_project_id and int(task_project_id) == project_id:
1✔
119
                # Calculate seconds remaining.
120
                seconds_remaining = LockManager.seconds_remaining(user_tasks[task_id])
1✔
121
                if seconds_remaining > 0:
1✔
122
                    # This lock has not yet expired.
123
                    tasks.append({
1✔
124
                        "user_id": user_id,
125
                        "task_id": task_id,
126
                        "seconds_remaining": seconds_remaining
127
                    })
128
    return tasks
1✔
129

130

131
class LockManager(object):
1✔
132
    """
133
    Class to manage resource locks
134
    :param cache: a Redis connection
135
    :param duration: how long a lock is valid after being acquired
136
        if not released (in seconds)
137
    """
138
    def __init__(self, cache, duration):
1✔
139
        self._redis = cache
1✔
140
        self._duration = duration
1✔
141

142
    def acquire_lock(self, resource_id, client_id, limit):
1✔
143
        """
144
        Acquire a lock on a resource.
145
        :param resource_id: resource on which lock is needed
146
        :param client_id: id of client needing the lock
147
        :param limit: how many clients can access the resource concurrently
148
        :return: True if lock was successfully acquired, else False
149
        """
150
        timestamp = time()
1✔
151
        expiration = timestamp + self._duration
1✔
152
        self._release_expired_locks(resource_id, timestamp)
1✔
153
        if self._redis.hexists(resource_id, client_id):
1✔
154
            return True
1✔
155

156
        pipeline = self._redis.pipeline()
1✔
157
        if limit == float('inf'):
1✔
158
            pipeline.hset(resource_id, client_id, expiration)
1✔
159
            pipeline.expire(resource_id, timedelta(seconds=self._duration))
1✔
160
            pipeline.execute()
1✔
161
            return True
1✔
162

163
        # Get a mutex lock for updating redis hash with default TTL 3s
164
        lock_name = f"{resource_id}_update_mutex"
1✔
165
        result = False
1✔
166

167
        try:
1✔
168
            with self._redis.lock(lock_name, timeout=3, blocking_timeout=1) as mutex:
1✔
169
                if not mutex.locked():
1✔
UNCOV
170
                    return False
×
171

172
                num_acquired = self._redis.hlen(resource_id)
1✔
173
                if num_acquired < limit:
1✔
174
                    pipeline.hset(resource_id, client_id, expiration)
1✔
175
                    pipeline.expire(resource_id, timedelta(seconds=self._duration))
1✔
176
                    pipeline.execute()
1✔
177
                    result = True
1✔
178
        finally:
179
            return result
1✔
180

181
    def has_lock(self, resource_id, client_id):
1✔
182
        """
183
        :param resource_id: resource on which lock is being held
184
        :param client_id: client id
185
        :return: True if client id holds a lock on the resource,
186
        False otherwise
187
        """
188
        exists = self._redis.hexists(resource_id, client_id)
1✔
189
        if not exists:
1✔
190
            return False
1✔
191
        time_str = self._redis.hget(resource_id, client_id)
1✔
192
        expiration = float(time_str)
1✔
193
        now = time()
1✔
194
        return expiration > now
1✔
195

196
    def release_lock(self, resource_id, client_id, pipeline=None):
1✔
197
        """
198
        Release a lock. Note that the lock is not release immediately, rather
199
        its expiration is set after a short interval from the current time.
200
        This is done so that concurrent requests will still see the lock and
201
        avoid race conditions due to possibly stale data already retrieved from
202
        the database.
203
        :param resource_id: resource on which lock is being held
204
        :param client_id: id of client holding the lock
205
        :param pipeline: object that can queue multiple commands for later execution
206
        """
207
        cache = pipeline or self._redis
1✔
208
        cache.hset(resource_id, client_id, time() + EXPIRE_LOCK_DELAY)
1✔
209

210
    def get_locks(self, resource_id):
1✔
211
        """
212
        Get all locks associated with a particular resource.
213
        :param resource_id: resource on which lock is being held
214
        """
215
        locks = self._redis.hgetall(resource_id)
1✔
216

217
        # By default, all responses are returned as bytes in Python 3 and
218
        # str in Python 2 - per https://github.com/andymccurdy/redis-py
219
        decoded_locks = {k.decode(): v.decode() for k, v in locks.items()}
1✔
220
        return decoded_locks
1✔
221

222
    def get_reservation_keys(self, resource_id):
1✔
223
        """
224
        Get all reservation key/resource_id associated with partial resource information.
225
        :param resource_id: resource on project/task/user
226
        """
227
        reservations = self._redis.keys(resource_id) or []
1✔
228
        decoded_reservation_keys = [k.decode() for k in reservations]
1✔
229
        return decoded_reservation_keys
1✔
230

231
    def _release_expired_locks(self, resource_id, now):
1✔
232
        locks = self.get_locks(resource_id)
1✔
233
        to_delete = []
1✔
234
        for key, expiration in locks.items():
1✔
235
            expiration = float(expiration)
1✔
236
            if now > expiration:
1✔
237
                to_delete.append(key)
1✔
238
        if to_delete:
1✔
239
            self._redis.hdel(resource_id, *to_delete)
1✔
240

241
    def _release_expired_reserve_for_project(self, project_id):
1✔
242
        resource_id = "reserve_task:project:{}:category:*:user:*:task:*".format(project_id)
1✔
243
        timestamp = time()
1✔
244

245
        reservation_keys = self.get_reservation_keys(resource_id)
1✔
246
        for k in reservation_keys:
1✔
247
            self._release_expired_reserve_task_locks(k, timestamp)
1✔
248

249
    def _release_expired_reserve_task_locks(self, resource_id, now):
1✔
250
        expiration = self._redis.get(resource_id) or 0
1✔
251
        if now > float(expiration):
1✔
252
            self._redis.delete(resource_id)
1✔
253

254

255
    @staticmethod
1✔
256
    def seconds_remaining(expiration):
1✔
257
        return float(expiration) - time()
1✔
258

259
    def get_task_category_lock(self, project_id, user_id=None, category=None, exclude_user=False, task_id=None):
1✔
260
        """
261
        Returns True when task category for a given user
262
        can be reserved or its already reserved, False otherwise.
263
        To fetch task category for all users who've reserved the category, pass user_id = None
264
        To fetch task category for all tasks reserved, pass task_id = None
265
        To fetch task category other than user_id, pass exclude_user = True
266
        """
267

268
        if not project_id:
1✔
269
            raise BadRequest('Missing required parameters')
1✔
270

271
        # with exclude_user set to True, user_id is to be excluded from list of
272
        # task category found for all users. raise error if user_id not passed
273
        if exclude_user and not user_id:
1✔
274
            raise BadRequest('Missing user id')
1✔
275

276
        # release expired task reservations
277
        self._release_expired_reserve_for_project(project_id)
1✔
278

279
        resource_id = "reserve_task:project:{}:category:{}:user:{}:task:{}".format(
1✔
280
            project_id,
281
            "*" if not category else category,
282
            "*" if not user_id or exclude_user else user_id,
283
            "*" if not task_id else task_id
284
        )
285

286
        category_keys = self.get_reservation_keys(resource_id)
1✔
287

288
        # if key present but for different user, with redundancy = 1, return false
289
        # TODO: for redundancy > 1, check if additional task run
290
        # available for this user and if so, return category_key else ""
291
        if exclude_user:
1✔
292
            # exclude user_id from list of keys passed
293
            drop_user = ":user:{}:task:".format(user_id)
1✔
294
            category_keys = [ key for key in category_keys if drop_user not in key ]
1✔
295
        return category_keys
1✔
296

297
    def acquire_reserve_task_lock(self, project_id, task_id, user_id, category):
1✔
298
        if not(project_id and user_id and task_id and category):
1✔
299
            raise BadRequest('Missing required parameters')
1✔
300

301
        # check task category reserved by user
302
        resource_id = "reserve_task:project:{}:category:{}:user:{}:task:{}".format(project_id, category, user_id, task_id)
1✔
303

304
        timestamp = time()
1✔
305
        self._release_expired_reserve_task_locks(resource_id, timestamp)
1✔
306
        expiration = timestamp + self._duration + EXPIRE_RESERVE_TASK_LOCK_DELAY
1✔
307
        return self._redis.set(resource_id, expiration)
1✔
308

309
    def release_reserve_task_lock(self, resource_id, expiry):
1✔
310
        #cache = pipeline or self._redis # https://pythonrepo.com/repo/andymccurdy-redis-py-python-connecting-and-operating-databases#locks
311
        cache = self._redis
1✔
312
        cache.expire(resource_id, expiry)
1✔
313

314
    def scan_keys(self, pattern):
1✔
315
        return self._redis.scan_iter(pattern)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc