• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

rero / rero-mef / 16621609190

30 Jul 2025 11:43AM UTC coverage: 84.491% (+0.008%) from 84.483%
16621609190

push

github

rerowep
chore: update dependencies

Co-Authored-by: Peter Weber <peter.weber@rero.ch>

4560 of 5397 relevant lines covered (84.49%)

0.84 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.44
/rero_mef/api.py
1
# RERO MEF
2
# Copyright (C) 2020 RERO
3
#
4
# This program is free software: you can redistribute it and/or modify
5
# it under the terms of the GNU Affero General Public License as published by
6
# the Free Software Foundation, version 3 of the License.
7
#
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU Affero General Public License for more details.
12
#
13
# You should have received a copy of the GNU Affero General Public License
14
# along with this program. If not, see <http://www.gnu.org/licenses/>.
15
"""API for manipulating records."""
16

17
from copy import deepcopy
1✔
18
from enum import Enum
1✔
19
from uuid import uuid4
1✔
20

21
from celery import current_app as current_celery_app
1✔
22
from elasticsearch.exceptions import NotFoundError
1✔
23
from elasticsearch.helpers import bulk
1✔
24
from flask import current_app
1✔
25
from invenio_db import db
1✔
26
from invenio_indexer.api import RecordIndexer
1✔
27
from invenio_pidstore.errors import PIDDoesNotExistError
1✔
28
from invenio_pidstore.models import PersistentIdentifier, PIDStatus
1✔
29
from invenio_records.api import Record
1✔
30
from invenio_records_rest.utils import obj_or_import_string
1✔
31
from invenio_search import current_search
1✔
32
from invenio_search.engine import search
1✔
33
from kombu.compat import Consumer
1✔
34
from sqlalchemy import func, text
1✔
35
from sqlalchemy.exc import OperationalError
1✔
36
from sqlalchemy.orm.exc import NoResultFound
1✔
37

38
from rero_mef.utils import build_ref_string, get_entity_class
1✔
39

40
from .utils import add_md5, add_schema
1✔
41

42
RERO_ILS_ENTITY_TYPES = {
1✔
43
    "bf:Person": "agents",
44
    "bf:Organisation": "agents",
45
    "bf:Topic": "concepts",
46
    "bf:Temporal": "concepts",
47
    "bf:Place": "places",
48
}
49

50

51
class Action(Enum):
1✔
52
    """Class holding all available entities record creation actions."""
53

54
    CREATE = "create"
1✔
55
    UPDATE = "update"
1✔
56
    REPLACE = "replace"
1✔
57
    UPTODATE = "uptodate"
1✔
58
    DISCARD = "discard"
1✔
59
    DELETE = "delete"
1✔
60
    ALREADY_DELETED = "already deleted"
1✔
61
    DELETE_ENTITY = "delete entity"
1✔
62
    VALIDATION_ERROR = "validation error"
1✔
63
    ERROR = "error"
1✔
64
    NOT_ONLINE = "not online"
1✔
65
    NOT_FOUND = "not found"
1✔
66

67

68
class EntityRecordError:
1✔
69
    """Base class for errors in the Record class."""
70

71
    class Deleted(Exception):
1✔
72
        """Record is deleted."""
73

74
    class NotDeleted(Exception):
1✔
75
        """Record is not deleted."""
76

77
    class PidMissing(Exception):
1✔
78
        """Record pid missing."""
79

80
    class PidChange(Exception):
1✔
81
        """Record pid change."""
82

83
    class PidAlradyUsed(Exception):
1✔
84
        """Record pid already used."""
85

86
    class PidDoesNotExist(Exception):
1✔
87
        """Pid does not exist."""
88

89
    class DataMissing(Exception):
1✔
90
        """Data missing in record."""
91

92

93
class EntityRecord(Record):
1✔
94
    """Entity Record class."""
95

96
    minter = None
1✔
97
    fetcher = None
1✔
98
    provider = None
1✔
99
    object_type = "rec"
1✔
100
    name = None
1✔
101

102
    @classmethod
1✔
103
    def flush_indexes(cls):
1✔
104
        """Update indexes."""
105
        try:
1✔
106
            current_search.flush_and_refresh(cls.search.Meta.index)
1✔
107
        except Exception as err:
×
108
            current_app.logger.error(f"ERROR flush and refresh: {err}")
×
109

110
    @classmethod
1✔
111
    def create(
1✔
112
        cls,
113
        data,
114
        id_=None,
115
        delete_pid=False,
116
        dbcommit=False,
117
        reindex=False,
118
        md5=False,
119
        **kwargs,
120
    ):
121
        """Create a new agent record."""
122
        assert cls.minter
1✔
123
        if "$schema" not in data:
1✔
124
            data = add_schema(data, cls.provider.pid_type)
1✔
125
        if delete_pid:
1✔
126
            data.pop("pid", None)
1✔
127
        if not id_:
1✔
128
            id_ = uuid4()
1✔
129
        cls.minter(id_, data)
1✔
130
        if md5:
1✔
131
            data = add_md5(data)
1✔
132
        record = super().create(data=data, id_=id_, **kwargs)
1✔
133
        if dbcommit:
1✔
134
            record.dbcommit(reindex)
1✔
135
        return record
1✔
136

137
    @classmethod
1✔
138
    def create_or_update(
1✔
139
        cls,
140
        data,
141
        id_=None,
142
        delete_pid=True,
143
        dbcommit=False,
144
        reindex=False,
145
        test_md5=False,
146
    ):
147
        """Create or update agent record."""
148
        action = Action.ERROR
1✔
149
        return_record = data
1✔
150

151
        pid = data.get("pid")
1✔
152
        if agent_record := cls.get_record_by_pid(pid):
1✔
153
            # record exist
154
            data = add_schema(data, agent_record.provider.pid_type)
1✔
155
            # save the records old data if the new one is empty
156
            copy_fields = [
1✔
157
                "pid",
158
                "$schema",
159
                "identifiedBy",
160
                "authorized_access_point",
161
                "type",
162
                "relation_pid",
163
                "deleted",
164
            ]
165
            original_data = {k: v for k, v in agent_record.items() if k in copy_fields}
1✔
166
            # dict merging, `original_data` values
167
            # will be override by `data` values
168
            data = {**original_data, **data}
1✔
169
            if test_md5:
1✔
170
                return_record, action = agent_record.update_md5_changed(
1✔
171
                    data=data, dbcommit=dbcommit, reindex=reindex
172
                )
173
            else:
174
                return_record = agent_record.replace(
1✔
175
                    data=data, dbcommit=dbcommit, reindex=reindex
176
                )
177
                action = Action.UPDATE
1✔
178
        else:
179
            try:
1✔
180
                return_record = cls.create(
1✔
181
                    data=data,
182
                    id_=None,
183
                    delete_pid=False,
184
                    dbcommit=dbcommit,
185
                    reindex=reindex,
186
                )
187
                action = Action.CREATE
1✔
188
            except Exception as err:
×
189
                current_app.logger.error(
×
190
                    f"ERROR create_or_update {cls.name} {data.get('pid')} {err}"
191
                )
192
                action = Action.ERROR
×
193
        if reindex:
1✔
194
            cls.flush_indexes()
1✔
195
        return return_record, action
1✔
196

197
    def delete(self, force=True, dbcommit=False, delindex=False):
1✔
198
        """Delete record and persistent identifier."""
199
        persistent_identifier = self.get_persistent_identifier(self.id)
1✔
200
        persistent_identifier.delete()
1✔
201
        if force:
1✔
202
            db.session.delete(persistent_identifier)
1✔
203
        result = super().delete(force=force)
1✔
204
        if dbcommit:
1✔
205
            self.dbcommit()
1✔
206
        if delindex:
1✔
207
            self.delete_from_index()
1✔
208
            self.flush_indexes()
1✔
209
        return result
1✔
210

211
    def update(self, data, commit=False, dbcommit=False, reindex=False):
1✔
212
        """Update data for record.
213

214
        :param data: a dict data to update the record.
215
        :param commit: if True push the db transaction.
216
        :param dbcommit: make the change effective in db.
217
        :param reindex: reindex the record.
218
        :returns: the modified record
219
        """
220
        if self.get("md5"):
1✔
221
            data = add_md5(data)
1✔
222
        super().update(data)
1✔
223
        if commit or dbcommit:
1✔
224
            self.commit()
1✔
225
        if dbcommit:
1✔
226
            self.dbcommit(reindex)
1✔
227
        return self
1✔
228

229
    def update_md5_changed(self, data, dbcommit=False, reindex=False):
1✔
230
        """Testing md5 for update existing record.
231

232
        :param data: Data to test MD5 changes.
233
        :param dbcommit: Commit changes to DB.
234
        :param reindex: Reindex record.
235
        :returns: Record.
236
        """
237
        data = deepcopy(data)
1✔
238
        data = add_md5(data)
1✔
239
        if data.get("md5", "data") == self.get("md5", "agent"):
1✔
240
            # record has no changes
241
            return self, Action.UPTODATE
1✔
242
        return_record = self.replace(data=data, dbcommit=dbcommit, reindex=reindex)
1✔
243
        return return_record, Action.UPDATE
1✔
244

245
    def replace(self, data, commit=False, dbcommit=False, reindex=False):
1✔
246
        """Replace data in record."""
247
        new_data = deepcopy(data)
1✔
248
        pid = new_data.get("pid")
1✔
249
        if not pid:
1✔
250
            raise EntityRecordError.PidMissing(f"missing pid={self.pid}")
×
251
        if self.get("md5"):
1✔
252
            new_data = add_md5(new_data)
1✔
253
        self.clear()
1✔
254
        return self.update(
1✔
255
            data=new_data, commit=commit, dbcommit=dbcommit, reindex=reindex
256
        )
257

258
    def dbcommit(self, reindex=False, forceindex=False):
1✔
259
        """Commit changes to db."""
260
        db.session.commit()
1✔
261
        if reindex:
1✔
262
            self.reindex(forceindex=forceindex)
1✔
263

264
    def reindex(self, forceindex=False):
1✔
265
        """Reindex record."""
266
        indexer = self.get_indexer_class()
1✔
267
        if forceindex:
1✔
268
            return indexer(version_type="external_gte").index(self)
×
269
        return indexer().index(self)
1✔
270

271
    @classmethod
1✔
272
    def get_record_by_pid(cls, pid, with_deleted=False):
1✔
273
        """Get ils record by pid value."""
274
        assert cls.provider
1✔
275
        get_record_error_count = 0
1✔
276
        get_record_ok = False
1✔
277
        while not get_record_ok and get_record_error_count < 5:
1✔
278
            try:
1✔
279
                persistent_identifier = PersistentIdentifier.get(
1✔
280
                    cls.provider.pid_type, pid
281
                )
282
                get_record_ok = True
1✔
283
                return super().get_record(
1✔
284
                    persistent_identifier.object_uuid, with_deleted=with_deleted
285
                )
286

287
            except PIDDoesNotExistError:
1✔
288
                return None
1✔
289
            except NoResultFound:
×
290
                return None
×
291
            except OperationalError:
×
292
                get_record_error_count += 1
×
293
                msg = f"Get record OperationalError: {get_record_error_count} {pid}"
×
294
                current_app.logger.error(msg)
×
295
                db.session.rollback()
×
296
        return None
×
297

298
    @classmethod
1✔
299
    def get_pid_by_id(cls, id_):
1✔
300
        """Get pid by uuid."""
301
        persistent_identifier = cls.get_persistent_identifier(id_)
1✔
302
        return str(persistent_identifier.pid_value)
1✔
303

304
    @classmethod
1✔
305
    def get_persistent_identifier(cls, id_):
1✔
306
        """Get Persistent Identifier."""
307
        return PersistentIdentifier.get_by_object(
1✔
308
            cls.provider.pid_type, cls.object_type, id_
309
        )
310

311
    @classmethod
1✔
312
    def _get_all(cls, with_deleted=False, date=None):
1✔
313
        """Get all persistent identifier records."""
314
        query = PersistentIdentifier.query.filter_by(pid_type=cls.provider.pid_type)
1✔
315
        if not with_deleted:
1✔
316
            query = query.filter_by(status=PIDStatus.REGISTERED)
1✔
317
        if date:
1✔
318
            query = query.filter(PersistentIdentifier.created < date)
1✔
319
        return query
1✔
320

321
    @classmethod
1✔
322
    def get_all_pids(cls, with_deleted=False, limit=100000, date=None):
1✔
323
        """Get all records pids. Return a generator iterator."""
324
        query = cls._get_all(with_deleted=with_deleted, date=date)
1✔
325
        if limit:
1✔
326
            # slower, less memory
327
            query = query.order_by(text("pid_value")).limit(limit)
1✔
328
            offset = 0
1✔
329
            count = cls.count(with_deleted=with_deleted)
1✔
330
            while offset < count:
1✔
331
                for identifier in query.offset(offset):
1✔
332
                    yield identifier.pid_value
1✔
333
                offset += limit
1✔
334
        else:
335
            # faster, more memory
336
            for identifier in query:
×
337
                yield identifier.pid_value
×
338

339
    @classmethod
1✔
340
    def get_all_deleted_pids(cls, limit=100000, from_date=None):
1✔
341
        """Get all records pids. Return a generator iterator."""
342
        query = PersistentIdentifier.query.filter_by(
1✔
343
            pid_type=cls.provider.pid_type
344
        ).filter_by(status=PIDStatus.DELETED)
345
        if from_date:
1✔
346
            query = query.filter(func.DATE(PersistentIdentifier.updated) >= from_date)
1✔
347
        if limit:
1✔
348
            # slower, less memory
349
            count = query.count()
1✔
350
            query = query.order_by(text("pid_value")).limit(limit)
1✔
351
            offset = 0
1✔
352
            while offset < count:
1✔
353
                for identifier in query.offset(offset):
×
354
                    yield identifier.pid_value
×
355
                offset += limit
×
356
        else:
357
            # faster, more memory
358
            for identifier in query:
×
359
                yield identifier.pid_value
×
360

361
    @classmethod
1✔
362
    def get_all_ids(cls, with_deleted=False, limit=100000, date=None):
1✔
363
        """Get all records uuids. Return a generator iterator."""
364
        query = cls._get_all(with_deleted=with_deleted, date=date)
1✔
365
        if limit:
1✔
366
            # slower, less memory
367
            query = query.order_by(text("pid_value")).limit(limit)
1✔
368
            offset = 0
1✔
369
            count = cls.count(with_deleted=with_deleted)
1✔
370
            while offset < count:
1✔
371
                for identifier in query.limit(limit).offset(offset):
1✔
372
                    yield identifier.object_uuid
1✔
373
                offset += limit
1✔
374
        else:
375
            # faster, more memory
376
            for identifier in query:
1✔
377
                yield identifier.object_uuid
1✔
378

379
    @classmethod
1✔
380
    def get_all_records(cls, with_deleted=False, limit=100000):
1✔
381
        """Get all records. Return a generator iterator."""
382
        for id_ in cls.get_all_ids(with_deleted=with_deleted, limit=limit):
1✔
383
            yield cls.get_record(id_)
1✔
384

385
    @classmethod
1✔
386
    def count(cls, with_deleted=False):
1✔
387
        """Get record count."""
388
        get_count_ok = False
1✔
389
        get_count_count = 0
1✔
390
        while not get_count_ok and get_count_count < 5:
1✔
391
            try:
1✔
392
                get_count_ok = True
1✔
393
                return cls._get_all(with_deleted=with_deleted).count()
1✔
394
            except OperationalError:
×
395
                get_count_count += 1
×
396
                msg = f"Get count OperationalError: {get_count_count}"
×
397
                current_app.logger.error(msg)
×
398
                db.session.rollback()
×
399
        raise OperationalError("Get count")
×
400

401
    @classmethod
1✔
402
    def index_all(cls):
1✔
403
        """Index all records."""
404
        ids = cls.get_all_ids()
1✔
405
        return cls.index_ids(ids)
1✔
406

407
    @classmethod
1✔
408
    def index_ids(cls, ids):
1✔
409
        """Index ids."""
410
        count = 0
1✔
411
        for uuid in ids:
1✔
412
            count += 1
1✔
413
            RecordIndexer().index(cls.get_record(uuid))
1✔
414
        return count
1✔
415

416
    @classmethod
1✔
417
    def get_indexer_class(cls):
1✔
418
        """Get the indexer from config."""
419
        try:
1✔
420
            indexer = obj_or_import_string(
1✔
421
                current_app.config["RECORDS_REST_ENDPOINTS"][cls.provider.pid_type][
422
                    "indexer_class"
423
                ]
424
            )
425
        except Exception:
×
426
            # provide default indexer if no indexer is defined in config.
427
            indexer = EntityIndexer
×
428
            current_app.logger.error(f"Get indexer class {cls.__name__}")
×
429
        return indexer
1✔
430

431
    def delete_from_index(self):
1✔
432
        """Delete record from index."""
433
        indexer = self.get_indexer_class()
1✔
434
        try:
1✔
435
            indexer().delete(self)
1✔
436
        except NotFoundError:
×
437
            current_app.logger.warning(
×
438
                f"Can not delete from index {self.__class__.__name__}: {self.pid}"
439
            )
440

441
    @property
1✔
442
    def pid(self):
1✔
443
        """Get record pid value."""
444
        return self.get("pid")
1✔
445

446
    @property
1✔
447
    def persistent_identifier(self):
1✔
448
        """Get Persistent Identifier."""
449
        return self.get_persistent_identifier(self.id)
×
450

451
    @classmethod
1✔
452
    def get_metadata_identifier_names(cls):
1✔
453
        """Get metadata and identif table names."""
454
        metadata = cls.model_cls.__tablename__
1✔
455
        identifier = cls.provider.pid_identifier
1✔
456
        return metadata, identifier
1✔
457

458
    @property
1✔
459
    def deleted(self):
1✔
460
        """Get record deleted value."""
461
        return self.get("deleted")
1✔
462

463

464
class ConceptPlaceRecord(EntityRecord):
1✔
465
    """Mef concept place class."""
466

467
    def get_association_record(self, association_cls, association_search):
1✔
468
        """Get associated record.
469

470
        :params association_cls: Association class
471
        :params association_search: Association search class.
472
        :returns: Associated record.
473
        """
474
        if association_identifier := self.association_identifier:
1✔
475
            # Test if my identifier is unique
476
            count = (
1✔
477
                self.search()
478
                .filter("term", _association_identifier=association_identifier)
479
                .count()
480
            )
481
            if count > 1:
1✔
482
                current_app.logger.error(
×
483
                    f"MULTIPLE IDENTIFIERS FOUND FOR: {self.name} {self.pid} "
484
                    f"| {association_identifier}"
485
                )
486
                return None
×
487
            # Get associated record
488
            query = association_search().filter(
1✔
489
                "term", _association_identifier=association_identifier
490
            )
491
            if query.count() > 1:
1✔
492
                current_app.logger.error(
×
493
                    f"MULTIPLE ASSOCIATIONS IDENTIFIERS FOUND FOR: {self.name} {self.pid} "
494
                    f"| {association_identifier}"
495
                )
496
            elif query.count() == 1:
1✔
497
                hit = next(query.source("pid").scan())
1✔
498
                return association_cls.get_record_by_pid(hit.pid)
1✔
499
        return None
1✔
500

501
    @property
1✔
502
    def association_identifier(self):
1✔
503
        """Get associated record.
504

505
        Has to be overloaded in concept/place class.
506
        """
507
        raise NotImplementedError()
×
508

509
    def create_or_update_mef(self, dbcommit=False, reindex=False):
1✔
510
        """Create or update MEF and VIAF record.
511

512
        :param dbcommit: Commit changes to DB.
513
        :param reindex: Reindex record.
514
        :returns: MEF record, MEF action
515
        """
516

517
        def mef_create(mef_cls, data, association_identifier, dbcommit, reindex):
1✔
518
            """Crate MEF record."""
519
            mef_data = {
1✔
520
                data.name: {
521
                    "$ref": build_ref_string(
522
                        entity_type=RERO_ILS_ENTITY_TYPES[data["type"]],
523
                        entity_name=data.name,
524
                        entity_pid=data.pid,
525
                    )
526
                },
527
                "type": data["type"],
528
            }
529
            if deleted := data.get("deleted"):
1✔
530
                mef_data["deleted"] = deleted
1✔
531
            if association_record := association_identifier.get("record"):
1✔
532
                ref = build_ref_string(
1✔
533
                    entity_type=RERO_ILS_ENTITY_TYPES[association_record["type"]],
534
                    entity_name=association_record.name,
535
                    entity_pid=association_record.pid,
536
                )
537
                mef_data[association_record.name] = {"$ref": ref}
1✔
538

539
            mef_record = mef_cls.create(
1✔
540
                data=mef_data,
541
                dbcommit=dbcommit,
542
                reindex=reindex,
543
            )
544
            return mef_record, {mef_record.pid: Action.CREATE}
1✔
545

546
        def get_mef_record(mef_cls, name, pid):
1✔
547
            """Get MEF record."""
548
            mef_records = mef_cls.get_mef(entity_name=name, entity_pid=pid)
1✔
549
            if len(mef_records) > 1:
1✔
550
                mef_pids = [mef_record.pid for mef_record in mef_records]
×
551
                current_app.logger.error(
×
552
                    f"MULTIPLE MEF FOUND FOR: {name} {pid} | mef: {', '.join(mef_pids)}"
553
                )
554
            if len(mef_records) == 1:
1✔
555
                return mef_records[0]
1✔
556
            return None
1✔
557

558
        association_info = self.association_info
1✔
559
        # Get direct MEF record
560
        mef_record = get_mef_record(
1✔
561
            mef_cls=association_info["mef_cls"], name=self.name, pid=self.pid
562
        )
563
        # Get associated MEF record
564
        mef_associated_record = {}
1✔
565
        if associated_record := association_info["record"]:
1✔
566
            # Get MEF record for the associated record.
567
            mef_associated_record = get_mef_record(
1✔
568
                mef_cls=association_info["mef_cls"],
569
                name=associated_record.name,
570
                pid=associated_record.pid,
571
            )
572
        new_mef_record = mef_record or mef_associated_record
1✔
573

574
        actions = {}
1✔
575
        if not mef_record and not mef_associated_record:
1✔
576
            mef_record, actions = mef_create(
1✔
577
                mef_cls=association_info["mef_cls"],
578
                data=self,
579
                association_identifier=association_info,
580
                dbcommit=dbcommit,
581
                reindex=reindex,
582
            )
583
        else:
584
            mef_pids = mef_record.ref_pids if mef_record else {}
1✔
585
            mef_association_pids = (
1✔
586
                mef_associated_record.ref_pids if mef_associated_record else {}
587
            )
588
            association_name = association_info["record_cls"].name
1✔
589
            mef_self_pid = mef_pids.get(self.name)
1✔
590
            mef_self_association_pid = mef_association_pids.get(self.name)
1✔
591
            mef_other_pid = mef_pids.get(association_name)
1✔
592
            mef_other_association_pid = mef_association_pids.get(association_name)
1✔
593

594
            # print(
595
            #     "------->",
596
            #     self.name,
597
            #     mef_self_pid,
598
            #     mef_self_association_pid,
599
            #     "|",
600
            #     association_name,
601
            #     mef_other_pid,
602
            #     mef_other_association_pid,
603
            # )
604

605
            # New ref
606
            if not new_mef_record.get(self.name):
1✔
607
                # Add new ref
608
                new_mef_record[self.name] = {
1✔
609
                    "$ref": build_ref_string(
610
                        entity_type=RERO_ILS_ENTITY_TYPES[self["type"]],
611
                        entity_name=self.name,
612
                        entity_pid=self.pid,
613
                    )
614
                }
615
            if (
1✔
616
                not bool(mef_self_association_pid)
617
                and not bool(mef_other_association_pid)
618
                and bool(mef_other_pid)
619
            ):
620
                # Delete associated ref from MEF and create a new one
621
                new_mef_record.pop(association_name)
1✔
622
                if association_record := association_info[
1✔
623
                    "record_cls"
624
                ].get_record_by_pid(mef_other_pid):
625
                    _, action = mef_create(
1✔
626
                        mef_cls=association_info["mef_cls"],
627
                        data=association_record,
628
                        association_identifier={},
629
                        dbcommit=dbcommit,
630
                        reindex=reindex,
631
                    )
632
                    actions |= action
1✔
633
            if (
1✔
634
                bool(mef_self_pid)
635
                and not bool(mef_self_association_pid)
636
                and not bool(mef_other_pid)
637
                and bool(mef_other_association_pid)
638
            ):
639
                # Delete entity from old MEF and add it to new MEF
640
                ref = mef_associated_record.pop(association_name)
1✔
641
                associated_mef_record = mef_associated_record.replace(
1✔
642
                    data=mef_associated_record, dbcommit=dbcommit, reindex=reindex
643
                )
644
                actions[associated_mef_record.pid] = Action.DELETE_ENTITY
1✔
645
                new_mef_record[association_name] = ref
1✔
646
            if (
1✔
647
                bool(mef_self_pid)
648
                and not bool(mef_self_association_pid)
649
                and bool(mef_other_pid)
650
                and bool(mef_other_association_pid)
651
            ):
652
                # Delete entity from new MEF and add it to old MEF
653
                ref = new_mef_record.pop(self.name)
1✔
654
                new_mef_record.replace(
1✔
655
                    data=new_mef_record, dbcommit=dbcommit, reindex=reindex
656
                )
657
                actions[new_mef_record.pid] = Action.DELETE_ENTITY
1✔
658
                mef_associated_record[self.name] = ref
1✔
659
                new_mef_record = mef_associated_record
1✔
660

661
            mef_record = new_mef_record.replace(
1✔
662
                data=new_mef_record, dbcommit=dbcommit, reindex=reindex
663
            )
664
            actions[mef_record.pid] = Action.UPDATE
1✔
665

666
        association_info["mef_cls"].flush_indexes()
1✔
667
        return mef_record, actions
1✔
668

669
    @classmethod
1✔
670
    def get_online_record(cls, id_, debug=False):
1✔
671
        """Get online Record.
672

673
        :param id_: Id of online record.
674
        :param debug: Debug print.
675
        :returns: record or None
676
        Has to be overloaded in concept/place class.
677
        """
678
        raise NotImplementedError()
×
679

680

681
class EntityIndexer(RecordIndexer):
1✔
682
    """Indexing class for mef."""
683

684
    def bulk_index(self, record_id_iterator, index=None, doc_type=None):
1✔
685
        """Bulk index records.
686

687
        :param record_id_iterator: Iterator yielding record UUIDs.
688
        """
689
        self._bulk_op(
1✔
690
            record_id_iterator, op_type="index", index=index, doc_type=doc_type
691
        )
692

693
    def process_bulk_queue(self, search_bulk_kwargs=None, stats_only=True):
1✔
694
        """Process bulk indexing queue.
695

696
        :param dict search_bulk_kwargs: Passed to `search.helpers.bulk`.
697
        :param boolean stats_only: if `True` only report number of
698
            successful/failed operations instead of just number of
699
            successful and a list of error responses
700
        """
701
        with current_celery_app.pool.acquire(block=True) as conn:
1✔
702
            consumer = Consumer(
1✔
703
                connection=conn,
704
                queue=self.mq_queue.name,
705
                exchange=self.mq_exchange.name,
706
                routing_key=self.mq_routing_key,
707
            )
708

709
            req_timeout = current_app.config["INDEXER_BULK_REQUEST_TIMEOUT"]
1✔
710

711
            search_bulk_kwargs = search_bulk_kwargs or {}
1✔
712

713
            count = bulk(
1✔
714
                self.client,
715
                self._actionsiter(consumer.iterqueue()),
716
                stats_only=stats_only,
717
                request_timeout=req_timeout,
718
                expand_action_callback=search.helpers.expand_action,
719
                **search_bulk_kwargs,
720
            )
721

722
            consumer.close()
1✔
723

724
        return count
1✔
725

726
    def _actionsiter(self, message_iterator):
1✔
727
        """Iterate bulk actions.
728

729
        :param message_iterator: Iterator yielding messages from a queue.
730
        """
731
        for message in message_iterator:
1✔
732
            payload = message.decode()
1✔
733
            try:
1✔
734
                if payload["op"] == "delete":
1✔
735
                    yield self._delete_action(payload=payload)
×
736
                else:
737
                    yield self._index_action(payload=payload)
1✔
738
                message.ack()
1✔
739
            except NoResultFound:
×
740
                message.reject()
×
741
            except Exception:
×
742
                message.reject()
×
743
                uid = payload.get("id", "???")
×
744
                current_app.logger.error(f"Failed to index record {uid}", exc_info=True)
×
745

746
    def _index_action(self, payload):
1✔
747
        """Bulk index action.
748

749
        :param payload: Decoded message body.
750
        :returns: Dictionary defining the search engine bulk 'index' action.
751
        """
752
        if doc_type := payload.get("doc_type"):
1✔
753
            record = get_entity_class(doc_type).get_record(payload["id"])
1✔
754
        else:
755
            record = self.record_cls.get_record(payload["id"])
×
756
        index = self.record_to_index(record)
1✔
757

758
        arguments = {}
1✔
759
        body = self._prepare_record(record, index, arguments)
1✔
760
        index = self._prepare_index(index)
1✔
761

762
        action = {
1✔
763
            "_op_type": "index",
764
            "_index": index,
765
            "_id": str(record.id),
766
            "_version": record.revision_id,
767
            "_version_type": self._version_type,
768
            "_source": body,
769
        }
770
        action.update(arguments)
1✔
771

772
        return action
1✔
773

774
    def _bulk_op(self, record_id_iterator, op_type, index=None, doc_type=None):
1✔
775
        """Index record in the search engine asynchronously.
776

777
        :param record_id_iterator: Iterator that yields record UUIDs.
778
        :param op_type: Indexing operation (one of ``index``, ``create``,
779
            ``delete`` or ``update``).
780
        :param index: The search engine index. (Default: ``None``)
781
        """
782
        with self.create_producer() as producer:
1✔
783
            for rec in record_id_iterator:
1✔
784
                data = {
1✔
785
                    "id": str(rec),
786
                    "op": op_type,
787
                    "index": index,
788
                    "doc_type": doc_type,
789
                }
790
                producer.publish(
1✔
791
                    data,
792
                    declare=[self.mq_queue],
793
                    **self.mq_publish_kwargs,
794
                )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc