• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

rero / sonar / 5576946992

pending completion
5576946992

Pull #929

github

web-flow
Merge 63e01a8fb into d0f35c594
Pull Request #929: URN: redirect to the right URL

192 of 192 new or added lines in 9 files covered. (100.0%)

7994 of 8284 relevant lines covered (96.5%)

1.93 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.0
/sonar/modules/documents/urn.py
1
# -*- coding: utf-8 -*-
2
#
3
# Swiss Open Access Repository
4
# Copyright (C) 2022 RERO
5
#
6
# This program is free software: you can redistribute it and/or modify
7
# it under the terms of the GNU Affero General Public License as published by
8
# the Free Software Foundation, version 3 of the License.
9
#
10
# This program is distributed in the hope that it will be useful,
11
# but WITHOUT ANY WARRANTY; without even the implied warranty of
12
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13
# GNU Affero General Public License for more details.
14
#
15
# You should have received a copy of the GNU Affero General Public License
16
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
17

18
"""Urn API."""
2✔
19

20
from datetime import datetime, timedelta, timezone
2✔
21

22
from flask import current_app
2✔
23
from invenio_db import db
2✔
24
from invenio_pidstore.errors import PIDAlreadyExists
2✔
25
from invenio_pidstore.models import PersistentIdentifier, PIDStatus
2✔
26

27
from sonar.modules.documents.models import UrnIdentifier
2✔
28

29

30
class URNAlreadyRegisterd(Exception):
2✔
31
    """The URN identifier is already registered."""
32

33

34

35
class Urn:
2✔
36
    """Urn class."""
37

38
    urn_pid_type = 'urn'
2✔
39

40
    @classmethod
2✔
41
    def _calculateCheckDigit(cls, urn):
2✔
42
        """Return the check-digit calculated on a URN.
43

44
        :param urn: the urn identifier.
45
        """
46
        # For details on the algorithm, see:
47
        # https://d-nb.info/1045320641/34
48
        # set up conversion table
49
        conversion_table = {
2✔
50
            '0':'1', '1':'2', '2':'3', '3':'4', '4':'5', '5':'6', '6':'7',
51
            '7':'8', '8':'9', '9':'41', 'A':'18', 'B':'14', 'C':'19', 'D':'15',
52
            'E':'16', 'F':'21', 'G':'22', 'H':'23', 'I':'24', 'J':'25',
53
            'K':'42', 'L':'26', 'M':'27', 'N':'13', 'O':'28', 'P':'29',
54
            'Q':'31', 'R':'12', 'S':'32', 'T':'33', 'U':'11', 'V':'34',
55
            'W':'35', 'X':'36', 'Y':'37', 'Z':'38', '-':'39', ':':'17',
56
            '_':'43', '.':'47', '/':'45','+':'49'
57
            }
58
        # obtain digit sequence using the conversion table by concatenating
59
        # chars
60
        digit_sequence = ''
2✔
61
        for idx, element in enumerate(urn, 0):
2✔
62
            digits = conversion_table[urn[idx].upper()]
2✔
63
            digit_sequence = digit_sequence + digits
2✔
64
        # calculate product sum
65
        product_sum = 0
2✔
66
        for idx, element in enumerate(digit_sequence, 0):
2✔
67
            product_sum = product_sum + ((idx+1) * int(digit_sequence[idx]))
2✔
68
        # read the last number of the digit sequence and calculate the quotient
69
        last_number = int(digit_sequence[-1])
2✔
70
        quotient = int(product_sum/last_number)
2✔
71
        quotient_string = str(quotient)
2✔
72
        # read the check-digit from the quotient value string
73
        return quotient_string[-1]
2✔
74

75
    @classmethod
2✔
76
    def _generate_urn(cls, pid, config):
2✔
77
        """Generate a URN code for namespace.
78

79
        :param pid: the pid of the urn identifier.
80
        :param config: organisation related configuration.
81
        """
82
        base_urn = current_app.config.get("SONAR_APP_URN_DNB_BASE_URN")
2✔
83
        new_urn = f'{base_urn}{config.get("code"):03}-{pid}'
2✔
84
        return f'{new_urn}{cls._calculateCheckDigit(new_urn)}'
2✔
85

86
    @classmethod
2✔
87
    def create_urn(cls, record):
2✔
88
        """Create the URN identifier.
89

90
        :param record: the invenio record instance to be processed.
91
        """
92
        from sonar.modules.documents.api import DocumentRecord
2✔
93
        urn_config = current_app.config.get("SONAR_APP_DOCUMENT_URN")
2✔
94
        org_pid = record.resolve().get("organisation", [{}])[0].get("pid")
2✔
95
        if DocumentRecord.get_rero_urn_code(record):
2✔
96
            current_app.logger.warning(
×
97
                    f'generated urn already exist for document: {record["pid"]}')
98
            return
×
99
        if config := urn_config.get("organisations", {}).get(org_pid):
2✔
100
            if record.get("documentType") in config.get("types"):
2✔
101
                urn_next_pid = str(UrnIdentifier.next())
2✔
102
                try:
2✔
103
                    urn_code = cls._generate_urn(int(urn_next_pid), config)
2✔
104
                    pid = PersistentIdentifier.create(
2✔
105
                        cls.urn_pid_type,
106
                        urn_code,
107
                        object_type="rec",
108
                        object_uuid=record.id,
109
                        status=PIDStatus.RESERVED,
110
                    )
111
                    if "identifiedBy" in record:
2✔
112
                        record["identifiedBy"].append(
2✔
113
                            {"type": "bf:Urn", "value": urn_code}
114
                        )
115
                    else:
116
                        record["identifiedBy"] = \
×
117
                            [{"type": "bf:Urn", "value": urn_code}]
118
                    return pid
2✔
119
                except PIDAlreadyExists:
×
120
                    current_app.logger.error(
×
121
                        f'generated urn already exist for document: {record["pid"]}')
122

123
    @classmethod
2✔
124
    def _urn_query(cls, status=None):
2✔
125
        """Build URN query.
126

127
        :param status: PID status by default N.
128
        :returns: Base query.
129
        """
130
        return PersistentIdentifier.query\
2✔
131
                .filter_by(pid_type=cls.urn_pid_type)\
132
                .filter_by(status=status)
133

134

135
    @classmethod
2✔
136
    def get_urn_pids(cls, status=PIDStatus.RESERVED, days=None):
2✔
137
        """Get count of URN pids by status and creation date.
138

139
        :param status: PID status by default N.
140
        :param days: Number of days passed since the creation of the document.
141
        :returns: Documents count.
142
        """
143
        query = cls._urn_query(status=status)
2✔
144
        if uuuids := [str(uuid.object_uuid) for uuid in query.all()]:
2✔
145
            from sonar.modules.documents.api import DocumentSearch
2✔
146
            query = DocumentSearch()\
2✔
147
                .filter('terms', _id=uuuids)
148
            if days:
2✔
149
                date = datetime.now(timezone.utc) - timedelta(days=days)
2✔
150
                query = query.filter('range', _created={'gte': date})
2✔
151
            def get_pids(query):
2✔
152
                for hit in query.source('pid').scan():
2✔
153
                    yield hit.pid
2✔
154
            return query.count(), get_pids(query)
2✔
155
        return 0, []
2✔
156

157

158
    @classmethod
2✔
159
    def get_unregistered_urns(cls):
2✔
160
        """Get list of unregistered URNs.
161

162
        :returns: List of unregistered URNs .
163
        """
164
        query = cls._urn_query(status=PIDStatus.RESERVED)
2✔
165
        return [str(uuid.pid_value) for uuid in query.all()]
2✔
166

167
    @classmethod
2✔
168
    def register_urn_code_from_document(cls, record):
2✔
169
        """Register the urn pid for a given document.
170

171
        :param record: The document.
172
        """
173
        from sonar.modules.documents.api import DocumentRecord
2✔
174
        from sonar.modules.documents.dnb import DnbUrnService
2✔
175

176
        urn_code = DocumentRecord.get_rero_urn_code(record)
2✔
177
        if not urn_code:
2✔
178
            return False
2✔
179
        pid = PersistentIdentifier.get(cls.urn_pid_type, urn_code)
2✔
180
        if pid.is_registered():
2✔
181
            current_app.logger.warning(
×
182
                f'URU {urn_code} is already registered for the document: '
183
                f'{record["pid"]}'
184
            )
185
            return False
×
186
        if DnbUrnService.register_document(record):
2✔
187
            pid.register()
2✔
188
            db.session.commit()
2✔
189
            return True
2✔
190
        return False
×
191

192
    @classmethod
2✔
193
    def get_documents_to_generate_urns(cls):
2✔
194
        """Get documents that need a URN code.
195

196
        :returns: generator of document records.
197
        """
198
        from elasticsearch_dsl import Q
2✔
199

200
        from sonar.modules.documents.api import DocumentRecord, DocumentSearch
2✔
201
        urn_config = current_app.config.get("SONAR_APP_DOCUMENT_URN")
2✔
202
        configs = urn_config.get('organisations', {})
2✔
203
        pids = []
2✔
204
        for org_pid in configs.keys():
2✔
205
            config = configs.get(org_pid)
2✔
206
            doc_types = config.get('types')
2✔
207
            query = DocumentSearch()\
2✔
208
                .filter('terms', documentType=doc_types)\
209
                .filter('term', organisation__pid=org_pid)\
210
                .filter('bool', must_not=[
211
                    Q('nested', path='identifiedBy', query=Q('term', identifiedBy__type='bf:Urn'))])\
212
                .source(['pid'])
213
            pids.extend(hit.pid for hit in query.scan())
2✔
214

215
        for pid in set(pids):
2✔
216
            yield DocumentRecord.get_record_by_pid(pid)
2✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc