• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

rero / rero-ils / 16145351338

08 Jul 2025 01:57PM UTC coverage: 92.101% (-0.09%) from 92.188%
16145351338

Pull #3892

github

web-flow
Merge b539f3fb8 into 43159ef46
Pull Request #3892: feat(circulation): add number of requests limits

17 of 40 new or added lines in 2 files covered. (42.5%)

2 existing lines in 2 files now uncovered.

23424 of 25433 relevant lines covered (92.1%)

0.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.77
/rero_ils/modules/patron_types/api.py
1
# -*- coding: utf-8 -*-
2
#
3
# RERO ILS
4
# Copyright (C) 2019-2025 RERO+
5
# Copyright (C) 2019-2022 UCLouvain
6
#
7
# This program is free software: you can redistribute it and/or modify
8
# it under the terms of the GNU Affero General Public License as published by
9
# the Free Software Foundation, version 3 of the License.
10
#
11
# This program is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14
# GNU Affero General Public License for more details.
15
#
16
# You should have received a copy of the GNU Affero General Public License
17
# along with this program. If not, see <http://www.gnu.org/licenses/>.
18

19
"""API for manipulating patron types."""
20

21
from __future__ import absolute_import, print_function
1✔
22

23
from functools import partial
1✔
24

25
from elasticsearch_dsl import Q
1✔
26
from flask_babel import gettext as _
1✔
27

28
from ..api import IlsRecord, IlsRecordsIndexer, IlsRecordsSearch
1✔
29
from ..circ_policies.api import CircPoliciesSearch
1✔
30
from ..fetchers import id_fetcher
1✔
31
from ..loans.api import get_loans_count_by_library_for_patron_pid, get_overdue_loan_pids
1✔
32
from ..loans.models import LoanState
1✔
33
from ..minters import id_minter
1✔
34
from ..patron_transactions.utils import get_transactions_total_amount_for_patron
1✔
35
from ..patrons.api import Patron, PatronsSearch
1✔
36
from ..providers import Provider
1✔
37
from ..utils import get_patron_from_arguments, sorted_pids
1✔
38
from .models import PatronTypeIdentifier, PatronTypeMetadata
1✔
39

40
# provider
41
PatronTypeProvider = type(
1✔
42
    "PatronTypeProvider",
43
    (Provider,),
44
    dict(identifier=PatronTypeIdentifier, pid_type="ptty"),
45
)
46
# minter
47
patron_type_id_minter = partial(id_minter, provider=PatronTypeProvider)
1✔
48
# fetcher
49
patron_type_id_fetcher = partial(id_fetcher, provider=PatronTypeProvider)
1✔
50

51

52
class PatronTypesSearch(IlsRecordsSearch):
1✔
53
    """PatronTypeSearch."""
54

55
    class Meta:
1✔
56
        """Search only on patrons index."""
57

58
        index = "patron_types"
1✔
59
        doc_types = None
1✔
60
        fields = ("*",)
1✔
61
        facets = {}
1✔
62

63
        default_filter = None
1✔
64

65
    def by_organisation_pid(self, organisation_pid):
1✔
66
        """Build a search to get hits related to an organisation pid.
67

68
        :param organisation_pid: string - the organisation pid to filter with
69
        :returns: An ElasticSearch query to get hits related the entity.
70
        :rtype: `elasticsearch_dsl.Search`
71
        """
72
        return self.filter("term", organisation__pid=organisation_pid)
1✔
73

74

75
class PatronType(IlsRecord):
1✔
76
    """PatronType class."""
77

78
    minter = patron_type_id_minter
1✔
79
    fetcher = patron_type_id_fetcher
1✔
80
    provider = PatronTypeProvider
1✔
81
    model_cls = PatronTypeMetadata
1✔
82
    pids_exist_check = {
1✔
83
        "required": {
84
            "org": "organisation",
85
        }
86
    }
87

88
    def extended_validation(self, **kwargs):
1✔
89
        """Add additional record validation.
90

91
        Ensure that checkout and request limits are coherent.
92
        Ensure that library limit exceptions are coherent.
93

94
        """
95
        if checkout_limits_data := self.get("limits", {}).get("checkout_limits"):
1✔
96
            global_limit = checkout_limits_data.get("global_limit")
1✔
97
            if library_limit := checkout_limits_data.get("library_limit"):
1✔
98
                # Library limit cannot be higher than global limit
99
                if library_limit > global_limit:
1✔
100
                    return _("Library limit cannot be higher than global limit.")
1✔
101
                # Exception limit cannot have same value than library limit
102
                # Only one exception per library
103
                exceptions_lib = []
1✔
104
                exceptions = checkout_limits_data.get("library_exceptions", [])
1✔
105
                for exception in exceptions:
1✔
106
                    if exception.get("value") == library_limit:
1✔
107
                        return _(
1✔
108
                            "Exception limit cannot have same value than library limit"
109
                        )
110
                    ref = exception.get("library").get("$ref")
1✔
111
                    if ref in exceptions_lib:
1✔
112
                        return _("Only one specific limit by library allowed.")
1✔
113
                    exceptions_lib.append(ref)
1✔
114

115
        if request_limits_data := self.get("limits", {}).get("request_limits"):
1✔
NEW
116
            global_limit = request_limits_data.get("global_limit")
×
NEW
117
            if library_limit := request_limits_data.get("library_limit"):
×
118
                # Library limit cannot be higher than global limit
NEW
119
                if library_limit > global_limit:
×
NEW
120
                    return _("Library limit cannot be higher than global limit.")
×
121
                # Exception limit cannot have same value as library limit
122
                # Only one exception per library
NEW
123
                exceptions_lib = []
×
NEW
124
                exceptions = request_limits_data.get("library_exceptions", [])
×
NEW
125
                for exception in exceptions:
×
NEW
126
                    if exception.get("value") == library_limit:
×
NEW
127
                        return _(
×
128
                            "Exception limit cannot have same value as library limit"
129
                        )
NEW
130
                    ref = exception.get("library").get("$ref")
×
NEW
131
                    if ref in exceptions_lib:
×
NEW
132
                        return _("Only one specific limit by library allowed.")
×
UNCOV
133
                    exceptions_lib.append(ref)
×
134
        return True
1✔
135

136
    @classmethod
1✔
137
    def exist_name_and_organisation_pid(cls, name, organisation_pid):
1✔
138
        """Check if the name is unique within organisation."""
139
        patron_type = (
1✔
140
            PatronTypesSearch()
141
            .filter("term", patron_type_name=name)
142
            .filter("term", organisation__pid=organisation_pid)
143
            .source()
144
            .scan()
145
        )
146
        try:
1✔
147
            return next(patron_type)
1✔
148
        except StopIteration:
1✔
149
            return None
1✔
150

151
    @classmethod
1✔
152
    def get_yearly_subscription_patron_types(cls):
1✔
153
        """Get PatronType with an active yearly subscription."""
154
        results = (
1✔
155
            PatronTypesSearch()
156
            .filter("range", subscription_amount={"gt": 0})
157
            .source("pid")
158
            .scan()
159
        )
160
        for result in results:
1✔
161
            yield cls.get_record_by_pid(result.pid)
1✔
162

163
    @classmethod
1✔
164
    def can_checkout(cls, item, **kwargs):
1✔
165
        """Check if a patron type allow checkout loan operation.
166

167
        :param item : the item to check
168
        :param kwargs : To be relevant, additional arguments should contains
169
                        'patron' argument.
170
        :return a tuple with True|False and reasons to disallow if False.
171
        """
172
        patron = get_patron_from_arguments(**kwargs)
1✔
173
        if not patron:
1✔
174
            # 'patron' argument are present into kwargs. This check can't
175
            # be relevant --> return True by default
176
            return True, []
1✔
177

178
        # check overdue items limits
179
        patron_type = PatronType.get_record_by_pid(patron.patron_type_pid)
1✔
180
        if not patron_type.check_overdue_items_limit(patron):
1✔
181
            return False, [
1✔
182
                _("Checkout denied: the maximal number of overdue items is reached")
183
            ]
184
        # check checkout count limit
185
        valid, message = patron_type.check_checkout_count_limit(patron, item)
1✔
186
        if not valid:
1✔
187
            return False, [message]
1✔
188
        # check fee amount limit
189
        if not patron_type.check_fee_amount_limit(patron):
1✔
190
            return False, [
1✔
191
                _("Checkout denied: the maximal overdue fee amount is reached")
192
            ]
193
        # check unpaid subscription
194
        if not patron_type.check_unpaid_subscription(patron):
1✔
195
            return False, [_("Checkout denied: patron has unpaid subscription")]
1✔
196
        return True, []
1✔
197

198
    @classmethod
1✔
199
    def can_request(cls, item, **kwargs):
1✔
200
        """Check if a patron type allow request item operation.
201

202
        :param item : the item to check
203
        :param kwargs : To be relevant, additional arguments should contains
204
                        'patron' argument.
205
        :return a tuple with True|False and reasons to disallow if False.
206
        """
207
        patron = get_patron_from_arguments(**kwargs)
1✔
208
        if not patron:
1✔
209
            # 'patron' argument are present into kwargs. This check can't
210
            # be relevant --> return True by default
211
            return True, []
1✔
212
        # check overdue items limits
213
        patron_type = PatronType.get_record_by_pid(patron.patron_type_pid)
1✔
214
        if not patron_type.check_overdue_items_limit(patron):
1✔
215
            return False, [
1✔
216
                _("Request denied: the maximal number of overdue items is reached")
217
            ]
218
        # check checkout count limit
219
        valid, message = patron_type.check_request_limits(patron, item)
1✔
220
        if not valid:
1✔
NEW
221
            return False, [message]
×
222
        # check fee amount limit
223
        if not patron_type.check_fee_amount_limit(patron):
1✔
224
            return False, [
1✔
225
                _("Request denied: the maximal overdue fee amount is reached")
226
            ]
227
        # check unpaid subscription
228
        if not patron_type.check_unpaid_subscription(patron):
1✔
229
            return False, [_("Request denied: patron has unpaid subscription")]
×
230

231
        return True, []
1✔
232

233
    @classmethod
1✔
234
    def can_extend(cls, item, **kwargs):
1✔
235
        """Check if a patron type allow extend loan operation.
236

237
        :param item : the item to check
238
        :param kwargs : To be relevant, additional arguments should contains
239
                        'patron' argument.
240
        :return a tuple with True|False and reasons to disallow if False.
241
        """
242
        patron = get_patron_from_arguments(**kwargs)
1✔
243
        if not patron:
1✔
244
            # 'patron' argument are present into kwargs. This check can't
245
            # be relevant --> return True by default
246
            return True, []
×
247
        # check overdue items limit
248
        patron_type = PatronType.get_record_by_pid(patron.patron_type_pid)
1✔
249
        if not patron_type.check_overdue_items_limit(patron):
1✔
250
            return False, [
1✔
251
                _("Renewal denied: the maximal number of overdue items is reached")
252
            ]
253
        # check fee amount limit
254
        if not patron_type.check_fee_amount_limit(patron):
1✔
255
            return False, [
1✔
256
                _("Renewal denied: the maximal overdue fee amount is reached")
257
            ]
258
        # check unpaid subscription
259
        if not patron_type.check_unpaid_subscription(patron):
1✔
260
            return False, [_("Renewal denied: patron has unpaid subscription")]
×
261
        return True, []
1✔
262

263
    def get_linked_patron(self):
1✔
264
        """Get patron linked to this patron type."""
265
        results = (
1✔
266
            PatronsSearch()
267
            .filter("term", patron__type__pid=self.pid)
268
            .source("pid")
269
            .scan()
270
        )
271
        for result in results:
1✔
272
            yield Patron.get_record_by_pid(result.pid)
1✔
273

274
    @property
1✔
275
    def is_subscription_required(self):
1✔
276
        """Check if a subscription is required for this patron type."""
277
        return self.get("subscription_amount", 0) > 0
1✔
278

279
    def get_links_to_me(self, get_pids=False):
1✔
280
        """Record links.
281

282
        :param get_pids: if True list of linked pids
283
                         if False count of linked records
284
        """
285
        ptrn_query = PatronsSearch().filter("term", patron__type__pid=self.pid)
1✔
286
        cipo_query = CircPoliciesSearch().filter(
1✔
287
            "nested",
288
            path="settings",
289
            query=Q("bool", must=[Q("match", settings__patron_type__pid=self.pid)]),
290
        )
291
        links = {}
1✔
292
        if get_pids:
1✔
293
            patrons = sorted_pids(ptrn_query)
×
294
            circ_policies = sorted_pids(cipo_query)
×
295
        else:
296
            patrons = ptrn_query.count()
1✔
297
            circ_policies = cipo_query.count()
1✔
298
        if patrons:
1✔
299
            links["patrons"] = patrons
1✔
300
        if circ_policies:
1✔
301
            links["circ_policies"] = circ_policies
1✔
302
        return links
1✔
303

304
    def reasons_not_to_delete(self):
1✔
305
        """Get reasons not to delete record."""
306
        cannot_delete = {}
1✔
307
        if links := self.get_links_to_me():
1✔
308
            cannot_delete["links"] = links
1✔
309
        return cannot_delete
1✔
310

311
    # CHECK LIMITS METHODS ====================================================
312
    def check_overdue_items_limit(self, patron):
1✔
313
        """Check if a patron reached the overdue items limit.
314

315
        :param patron: the patron who tries to execute the checkout.
316
        :return False if patron has more overdue items than defined limit. True
317
                in all other cases.
318
        """
319
        if limit := (
1✔
320
            self.get("limits", {}).get("overdue_items_limits", {}).get("default_value")
321
        ):
322
            overdue_items = list(get_overdue_loan_pids(patron.pid))
1✔
323
            return limit > len(overdue_items)
1✔
324
        return True
1✔
325

326
    def check_request_limits(self, patron, item=None):
1✔
327
        """Check if a patron reached the request limits.
328

329
        * check the global general limit (if exists).
330
        * check the library exception limit (if exists).
331
        * check the library default limit (if exists).
332
        :param patron: the patron who tries to request the item.
333
        :param item: the item related to the loan (optionnal).
334
        :return a tuple of two values ::
335
          - True|False : to know if the check is success or not.
336
          - message(string) : the reason why the check fails.
337
        """
338
        request_limits = self.replace_refs().get("limits", {}).get("request_limits", {})
1✔
339
        global_limit = request_limits.get("global_limit")
1✔
340
        if not global_limit:
1✔
341
            return True, None
1✔
342

343
        # [0] get the stats for this patron by library
NEW
344
        patron_library_stats = get_loans_count_by_library_for_patron_pid(
×
345
            patron.pid, LoanState.REQUEST_STATES
346
        )
347

348
        # [1] check the general limit
NEW
349
        patron_total_count = sum(patron_library_stats.values()) or 0
×
NEW
350
        if patron_total_count >= global_limit:
×
NEW
351
            return False, _("Request denied: maximum number of requests reached.")
×
352

353
        # [2] check library_limit if item is not none
NEW
354
        if item:
×
NEW
355
            item_lib_pid = item.library_pid
×
NEW
356
            library_limit_value = next(
×
357
                (
358
                    exception["value"]
359
                    for exception in request_limits.get("library_exceptions", [])
360
                    if exception["library"]["pid"] == item_lib_pid
361
                ),
362
                request_limits.get("library_limit"),
363
            )
NEW
364
            if (
×
365
                library_limit_value
366
                and item_lib_pid in patron_library_stats
367
                and patron_library_stats[item_lib_pid] >= library_limit_value
368
            ):
NEW
369
                return False, _(
×
370
                    "Request denied: maximum number of requests for this library "
371
                    "reached."
372
                )
373

374
        # [3] no problem detected, checkout is allowed
NEW
375
        return True, None
×
376

377
    def check_checkout_count_limit(self, patron, item=None):
1✔
378
        """Check if a patron reached the checkout limits.
379

380
        * check the global general limit (if exists).
381
        * check the library exception limit (if exists).
382
        * check the library default limit (if exists).
383
        :param patron: the patron who tries to execute the checkout.
384
        :param item: the item related to the loan (optionnal).
385
        :return a tuple of two values ::
386
          - True|False : to know if the check is success or not.
387
          - message(string) : the reason why the check fails.
388
        """
389
        checkout_limits = (
1✔
390
            self.replace_refs().get("limits", {}).get("checkout_limits", {})
391
        )
392
        global_limit = checkout_limits.get("global_limit")
1✔
393
        if not global_limit:
1✔
394
            return True, None
1✔
395

396
        # [0] get the stats for this patron by library
397
        patron_library_stats = get_loans_count_by_library_for_patron_pid(
1✔
398
            patron.pid, [LoanState.ITEM_ON_LOAN]
399
        )
400

401
        # [1] check the general limit
402
        patron_total_count = sum(patron_library_stats.values()) or 0
1✔
403
        if patron_total_count >= global_limit:
1✔
404
            return False, _("Checkout denied: the maximal checkout number is reached.")
1✔
405

406
        # [2] check library_limit if item is not none
407
        if item:
1✔
408
            item_lib_pid = item.library_pid
1✔
409
            library_limit_value = next(
1✔
410
                (
411
                    exception["value"]
412
                    for exception in checkout_limits.get("library_exceptions", [])
413
                    if exception["library"]["pid"] == item_lib_pid
414
                ),
415
                checkout_limits.get("library_limit"),
416
            )
417
            if (
1✔
418
                library_limit_value
419
                and item_lib_pid in patron_library_stats
420
                and patron_library_stats[item_lib_pid] >= library_limit_value
421
            ):
422
                return False, _(
1✔
423
                    "Checkout denied: the maximal checkout "
424
                    "number of items for this library is "
425
                    "reached."
426
                )
427

428
        # [3] no problem detected, checkout is allowed
429
        return True, None
1✔
430

431
    def check_fee_amount_limit(self, patron):
1✔
432
        """Check if a patron reached the fee amount limits.
433

434
        * check the fee amount limit (if exists).
435
        :param patron: the patron who tries to execute the checkout.
436
        :return boolean to know if the check is success or not.
437
        """
438
        # get fee amount limit
439
        fee_amount_limits = (
1✔
440
            self.replace_refs().get("limits", {}).get("fee_amount_limits", {})
441
        )
442
        if default_limit := fee_amount_limits.get("default_value"):
1✔
443
            # get total amount for open transactions on overdue and without
444
            # subscription fee
445
            patron_total_amount = get_transactions_total_amount_for_patron(
1✔
446
                patron.pid, status="open", types=["overdue"], with_subscription=False
447
            )
448
            return patron_total_amount < default_limit
1✔
449
        return True
1✔
450

451
    def check_unpaid_subscription(self, patron):
1✔
452
        """Check if a patron as unpaid subscriptions.
453

454
        The 'unpaid_subscription' limit should be enable to have a consistent
455
        check.
456
        :param patron: the patron who tried to execute a circulation operation.
457
        :return boolean to know if the check is success or not.
458
        """
459
        unpaid_subscription_limit = self.get("limits", {}).get(
1✔
460
            "unpaid_subscription", True
461
        )
462
        if not unpaid_subscription_limit:
1✔
463
            return True, None
1✔
464
        unpaid_amount = get_transactions_total_amount_for_patron(
1✔
465
            patron.pid, status="open", types=["subscription"], with_subscription=True
466
        )
467
        return unpaid_amount == 0
1✔
468

469

470
class PatronTypesIndexer(IlsRecordsIndexer):
1✔
471
    """Holdings indexing class."""
472

473
    record_cls = PatronType
1✔
474

475
    def bulk_index(self, record_id_iterator):
1✔
476
        """Bulk index records.
477

478
        :param record_id_iterator: Iterator yielding record UUIDs.
479
        """
480
        super().bulk_index(record_id_iterator, doc_type="ptty")
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc