• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

rero / rero-mef / 16621609190

30 Jul 2025 11:43AM UTC coverage: 84.491% (+0.008%) from 84.483%
16621609190

push

github

rerowep
chore: update dependencies

Co-Authored-by: Peter Weber <peter.weber@rero.ch>

4560 of 5397 relevant lines covered (84.49%)

0.84 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.12
/rero_mef/api_mef.py
1
# RERO MEF
2
# Copyright (C) 2024 RERO
3
#
4
# This program is free software: you can redistribute it and/or modify
5
# it under the terms of the GNU Affero General Public License as published by
6
# the Free Software Foundation, version 3 of the License.
7
#
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU Affero General Public License for more details.
12
#
13
# You should have received a copy of the GNU Affero General Public License
14
# along with this program. If not, see <http://www.gnu.org/licenses/>.
15

16
"""API for manipulating MEF records."""
17

18
from copy import deepcopy
1✔
19
from datetime import datetime, timezone
1✔
20

21
import click
1✔
22
from dateutil import parser
1✔
23
from elasticsearch_dsl import Q
1✔
24
from flask import current_app
1✔
25

26
from .api import Action, EntityRecord
1✔
27
from .utils import generate, get_entity_class, get_entity_search_class, progressbar
1✔
28

29

30
class EntityMefRecord(EntityRecord):
1✔
31
    """Mef entity class."""
32

33
    minter = None
1✔
34
    fetcher = None
1✔
35
    provider = None
1✔
36
    model_cls = None
1✔
37
    viaf_cls = None
1✔
38
    search = None
1✔
39
    mef_type = ""
1✔
40

41
    def set_deleted(self):
1✔
42
        """Set deleted.
43

44
        Sets MEF deleted value from sources.
45
        """
46
        changed = False
1✔
47
        source_data = deepcopy(self).replace_refs()
1✔
48
        if sources := source_data["sources"]:
1✔
49
            for source in sources:
1✔
50
                if deleted := self[source].get("deleted"):
1✔
51
                    self["deleted"] = deleted
×
52
                    changed = True
×
53
                    break
×
54
            if not changed and self.get("deleted"):
1✔
55
                # Delete old deleted data
56
                self.pop("deleted")
1✔
57
                changed = True
1✔
58
        return changed
1✔
59

60
    def update(self, data, commit=False, dbcommit=False, reindex=False):
1✔
61
        """Update data for record.
62

63
        :param data: a dict data to update the record.
64
        :param commit: if True push the db transaction.
65
        :param dbcommit: make the change effective in db.
66
        :param reindex: reindex the record.
67
        :returns: the modified record
68
        """
69
        self.set_deleted()
1✔
70
        return super().update(
1✔
71
            data=data, commit=commit, dbcommit=dbcommit, reindex=reindex
72
        )
73

74
    @classmethod
1✔
75
    def get_mef(cls, entity_pid, entity_name, pid_only=False):
1✔
76
        """Get MEF record by entity pid value.
77

78
        :param entity_pid: Entity pid.
79
        :param entity_name: Name of entity (pid_type).
80
        :param pid_only: return pid only or the complete record.
81
        :returns: pid or record
82
        """
83
        key = f"{entity_name}.pid"
1✔
84
        if entity_name == "viaf":
1✔
85
            key = "viaf_pid"
1✔
86
        query = (
1✔
87
            cls.search()
88
            .filter("term", **{key: entity_pid})
89
            .params(preserve_order=True)
90
            .sort({"_updated": {"order": "desc"}})
91
        )
92
        if pid_only:
1✔
93
            mef_records = [hit.pid for hit in (query.source(["pid"]).scan())]
1✔
94
        else:
95
            mef_records = [cls.get_record(hit.meta.id) for hit in query.scan()]
1✔
96
        if len(mef_records) > 1:
1✔
97
            mef_pids = mef_records if pid_only else [mef.pid for mef in mef_records]
1✔
98
            current_app.logger.error(
1✔
99
                f"MULTIPLE MEF FOUND FOR: {entity_name} {entity_pid} | "
100
                f"mef: {', '.join(mef_pids)}"
101
            )
102
        return mef_records
1✔
103

104
    @classmethod
1✔
105
    def get_all_pids_without_entities_and_viaf(cls):
1✔
106
        """Get all pids for records without entities and VIAF pids.
107

108
        :returns: Generator of MEF pids without entity links and without VIAF.
109
        """
110
        must_not = [Q("exists", field="viaf_pid")]
1✔
111
        must_not.extend(Q("exists", field=entity) for entity in cls.entities)
1✔
112
        query = cls.search().filter("bool", must_not=must_not)
1✔
113
        for hit in query.source("pid").scan():
1✔
114
            yield hit.pid
1✔
115

116
    @classmethod
1✔
117
    def get_all_pids_without_viaf(cls):
1✔
118
        """Get all pids for records without VIAF pid.
119

120
        :returns: Generator of MEF pids without VIAF pid.
121
        """
122
        query = cls.search().exclude("exists", field="viaf_pid")
×
123
        for pid_type in current_app.config.get(cls.mef_type, []):
×
124
            query = query.filter("bool", should=[Q("exists", field=pid_type)])
×
125
        for hit in query.source("pid").scan():
×
126
            yield hit.pid
×
127

128
    @classmethod
1✔
129
    def get_multiple_missing_pids(cls, record_types=None, verbose=False):
1✔
130
        """Get entity pids with multiple MEF records.
131

132
        :params record_types: Record types (pid_types).
133
        :param verbose: Verbose.
134
        :param before: Get multiple MEF before x minutes (default 1 minute).
135
        :returns: pids, multiple pids, missing pids.
136
        """
137
        pids = {}
1✔
138
        multiple_pids = {}
1✔
139
        missing_pids = {}
1✔
140
        none_pids = {}
1✔
141
        entities = {}
1✔
142
        sources = ["pid"]
1✔
143
        for record_type in record_types or []:
1✔
144
            try:
1✔
145
                entity_class = get_entity_class(record_type)
1✔
146
                entities[record_type] = {
1✔
147
                    "name": entity_class.name,
148
                    "search": get_entity_search_class(record_type)(),
149
                }
150
                pids[record_type] = {}
1✔
151
                multiple_pids[record_type] = {}
1✔
152
                missing_pids[record_type] = []
1✔
153
                none_pids[record_type] = []
1✔
154
                sources.append(f"{entity_class.name}.pid")
1✔
155
            except Exception:
×
156
                current_app.logger.error(f"Record type not found: {record_type}")
×
157

158
        # Get all pids from MEF
159
        date = datetime.now(timezone.utc)
1✔
160
        click.echo("Get mef")
1✔
161
        progress = progressbar(
1✔
162
            items=cls.search()
163
            .params(preserve_order=True)
164
            .sort({"_updated": {"order": "desc"}})
165
            .source(sources)
166
            .scan(),
167
            length=cls.search().count(),
168
            verbose=verbose,
169
        )
170
        for hit in progress:
1✔
171
            data = hit.to_dict()
1✔
172
            mef_pid = data["pid"]
1✔
173
            for record_type, info in entities.items():
1✔
174
                if entity_data := data.get(info["name"]):
1✔
175
                    if entity_pid := entity_data.get("pid"):
1✔
176
                        pids[record_type].setdefault(entity_pid, []).append(mef_pid)
1✔
177
                        if len(pids[record_type][entity_pid]) > 1:
1✔
178
                            multiple_pids[record_type][entity_pid] = pids[record_type][
1✔
179
                                entity_pid
180
                            ]
181
                    else:
182
                        none_pids[record_type].append(mef_pid)
×
183
        # Get all entities pids and compare with MEF pids
184
        for record_type, info in entities.items():
1✔
185
            click.echo(f"Get {info['name']} MEF: {len(pids[record_type])}")
1✔
186
            progress = progressbar(
1✔
187
                items=info["search"]
188
                .params(preserve_order=True)
189
                .sort({"pid": {"order": "asc"}})
190
                .filter("range", _created={"lte": date})
191
                .source("pid")
192
                .scan(),
193
                length=info["search"].filter("range", _created={"lte": date}).count(),
194
                verbose=verbose,
195
            )
196
            for hit in progress:
1✔
197
                pid = hit.pid
1✔
198
                if not pids[record_type].pop(pid, None):
1✔
199
                    missing_pids[record_type].append(pid)
×
200
        return pids, multiple_pids, missing_pids, none_pids
1✔
201

202
    @classmethod
1✔
203
    def get_deleted(cls, missing_pids, from_date):
1✔
204
        """Get deleted records."""
205
        # find deleted pids.
206
        for missing_pid in missing_pids:
1✔
207
            missing = {"pid": missing_pid}
1✔
208
            mef = cls.get_record_by_pid(missing_pid, with_deleted=True)
1✔
209
            if mef is None:
1✔
210
                yield missing
1✔
211
            elif mef == {}:
×
212
                # MEF was deleted!
213
                missing["_created"] = mef.created.isoformat()
×
214
                missing["_updated"] = mef.updated.isoformat()
×
215
                if from_date:
×
216
                    if mef.updated >= parser.isoparse(from_date):
×
217
                        yield missing
×
218
                else:
219
                    yield missing
×
220

221
    @classmethod
1✔
222
    def get_updated(cls, data):
1✔
223
        """Get latest Mef record for pid_type and pid.
224

225
        :param pid_type: pid type to use.
226
        :param pid: pid to use..
227
        :returns: latest record.
228
        """
229
        search = (
1✔
230
            cls.search().params(preserve_order=True).sort({"pid": {"order": "asc"}})
231
        )
232
        deleted = []
1✔
233
        from_date = data.get("from_date")
1✔
234
        if from_date:
1✔
235
            search = search.filter("range", _updated={"gte": from_date})
1✔
236
        missing_pids = []
1✔
237
        if pids := data.get("pids"):
1✔
238
            search = search.filter("terms", pid=pids)
1✔
239
            missing_pids.extend(
1✔
240
                pid for pid in pids if cls.search().filter("term", pid=pid).count() == 0
241
            )
242
        else:
243
            # Get all deleted pids.
244
            try:
1✔
245
                missing_pids = cls.get_all_deleted_pids(from_date=data.get("from_date"))
1✔
246
            except Exception as err:
×
247
                raise Exception(err)
×
248

249
        if not data.get("resolve"):
1✔
250
            search = search.source(["pid", "deleted", "_created", "_updated"])
1✔
251
        deleted = cls.get_deleted(missing_pids, from_date)
1✔
252
        return generate(search, deleted)
1✔
253

254
    def delete_ref(self, record, dbcommit=False, reindex=False):
1✔
255
        """Delete $ref from record.
256

257
        :param record: Record to delete the $ref.
258
        :param dbcommit: Commit changes to DB.
259
        :param reindex: Reindex record.
260
        :returns: Modified record and executed action.
261
        """
262
        action = Action.DISCARD
1✔
263
        if self.pop(record.name, None):
1✔
264
            action = Action.DELETE
1✔
265
            self.replace(data=self, dbcommit=dbcommit, reindex=reindex)
1✔
266
            if reindex:
1✔
267
                self.flush_indexes()
1✔
268
        return self, action
1✔
269

270
    @property
1✔
271
    def ref_pids(self):
1✔
272
        """Get ref pids from $ref."""
273
        ref_pids = {}
1✔
274
        for entity_name in self.entities:
1✔
275
            if ref := self.get(entity_name):
1✔
276
                ref_pids[entity_name] = ref["$ref"].split("/")[-1]
1✔
277
        return ref_pids
1✔
278

279
    def get_entities_pids(self):
1✔
280
        """Get entities pids."""
281
        entities = []
1✔
282
        entity_types = current_app.config.get(f"RERO_{self.mef_type}", [])
1✔
283
        for entity_type in entity_types:
1✔
284
            record_class = get_entity_class(entity_type)
1✔
285
            name = record_class.name
1✔
286
            if name in self:
1✔
287
                entities.append(
1✔
288
                    {
289
                        "record_class": record_class,
290
                        # Get pid from $ref URL
291
                        "pid": self.get(name).get("$ref").split("/")[-1],
292
                    }
293
                )
294
        return entities
1✔
295

296
    def get_entities_records(self):
1✔
297
        """Get entities records."""
298
        entities_records = []
1✔
299
        for entity in self.get_entities_pids():
1✔
300
            record_class = entity["record_class"]
1✔
301
            if entity_record := record_class.get_record_by_pid(entity["pid"]):
1✔
302
                entities_records.append(entity_record)
1✔
303
        return entities_records
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc