• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

rero / rero-mef / 4438608221

pending completion
4438608221

push

github

Peter Weber
agents GND: get online record by SRU

47 of 47 new or added lines in 5 files covered. (100.0%)

3565 of 4250 relevant lines covered (83.88%)

0.84 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

63.64
/rero_mef/utils.py
1
# -*- coding: utf-8 -*-
2
#
3
# RERO MEF
4
# Copyright (C) 2020 RERO
5
#
6
# This program is free software: you can redistribute it and/or modify
7
# it under the terms of the GNU Affero General Public License as published by
8
# the Free Software Foundation, version 3 of the License.
9
#
10
# This program is distributed in the hope that it will be useful,
11
# but WITHOUT ANY WARRANTY; without even the implied warranty of
12
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13
# GNU Affero General Public License for more details.
14
#
15
# You should have received a copy of the GNU Affero General Public License
16
# along with this program. If not, see <http://www.gnu.org/licenses/>.
17

18
# -*- coding: utf-8 -*-
19
#
20
# Copyright (C) 2018 RERO.
21
#
22
# RERO Ebooks is free software; you can redistribute it and/or modify it
23
# under the terms of the MIT License; see LICENSE file for more details.
24

25
"""Utilities."""
1✔
26
import gc
1✔
27
import hashlib
1✔
28
import json
1✔
29
import os
1✔
30
import traceback
1✔
31
from copy import deepcopy
1✔
32
from datetime import datetime, timedelta, timezone
1✔
33
from io import StringIO
1✔
34
from json import JSONDecodeError, JSONDecoder, dumps
1✔
35
from time import sleep
1✔
36
from uuid import uuid4
1✔
37

38
import click
1✔
39
import ijson
1✔
40
import psycopg2
1✔
41
import requests
1✔
42
import sqlalchemy
1✔
43
from dateutil import parser
1✔
44
from flask import current_app
1✔
45
from invenio_cache.proxies import current_cache
1✔
46
from invenio_db import db
1✔
47
from invenio_oaiharvester.api import get_info_by_oai_name
1✔
48
from invenio_oaiharvester.errors import InvenioOAIHarvesterConfigNotFound, \
1✔
49
    WrongDateCombination
50
from invenio_oaiharvester.models import OAIHarvestConfig
1✔
51
from invenio_oaiharvester.utils import get_oaiharvest_object
1✔
52
from invenio_pidstore.errors import PIDDoesNotExistError
1✔
53
from invenio_pidstore.models import PersistentIdentifier
1✔
54
from invenio_records_rest.utils import obj_or_import_string
1✔
55
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT
1✔
56
from pymarc.marcxml import parse_xml_to_array
1✔
57
from requests.adapters import HTTPAdapter
1✔
58
from requests.packages.urllib3.util.retry import Retry
1✔
59
from sickle import Sickle, oaiexceptions
1✔
60
from sickle.iterator import OAIItemIterator
1✔
61
from sickle.oaiexceptions import NoRecordsMatch
1✔
62

63
# Hours can not be retrieved by get_info_by_oai_name
64
# TIME_FORMAT = '%Y-%m-%dT%H:%M:%SZ'
65
TIME_FORMAT = '%Y-%m-%d'
1✔
66

67

68
def add_oai_source(name, baseurl, metadataprefix='marc21',
1✔
69
                   setspecs='', comment='', update=False):
70
    """Add OAIHarvestConfig."""
71
    with current_app.app_context():
1✔
72
        source = OAIHarvestConfig.query.filter_by(name=name).first()
1✔
73
        if not source:
1✔
74
            source = OAIHarvestConfig(
1✔
75
                name=name,
76
                baseurl=baseurl,
77
                metadataprefix=metadataprefix,
78
                setspecs=setspecs,
79
                comment=comment
80
            )
81
            source.save()
1✔
82
            db.session.commit()
1✔
83
            return 'Added'
1✔
84
        elif update:
1✔
85
            source.name = name
1✔
86
            source.baseurl = baseurl
1✔
87
            source.metadataprefix = metadataprefix
1✔
88
            if setspecs != '':
1✔
89
                source.setspecs = setspecs
1✔
90
            if comment != '':
1✔
91
                source.comment = comment
1✔
92
            db.session.commit()
1✔
93
            return 'Updated'
1✔
94
        return 'Not Updated'
1✔
95

96

97
def oai_get_last_run(name, verbose=False):
1✔
98
    """Gets the lastrun for a OAI harvest configuration.
99

100
    :param name: name of the OAI harvest configuration.
101
    :return: datetime of last OAI harvest run.
102
    """
103
    try:
1✔
104
        oai_source = get_oaiharvest_object(name)
1✔
105
        lastrun_date = oai_source.lastrun
1✔
106
        if verbose:
1✔
107
            click.echo(f'OAI {name}: last run: {lastrun_date}')
1✔
108
        return lastrun_date
1✔
109
    except InvenioOAIHarvesterConfigNotFound:
1✔
110
        if verbose:
1✔
111
            click.echo((f'ERROR OAI config not found: {name}'))
1✔
112
        return None
1✔
113

114

115
def oai_set_last_run(name, date, verbose=False):
1✔
116
    """Sets the lastrun for a OAI harvest configuration.
117

118
    :param name: name of the OAI harvest configuration.
119
    :param date: Date to set as last run
120
    :return: datetime of date to set.
121
    """
122
    try:
1✔
123
        oai_source = get_oaiharvest_object(name)
1✔
124
        lastrun_date = date
1✔
125
        if isinstance(date, str):
1✔
126
            lastrun_date = parser.isoparse(date)
1✔
127
        oai_source.update_lastrun(lastrun_date)
1✔
128
        oai_source.save()
1✔
129
        db.session.commit()
1✔
130
        if verbose:
1✔
131
            click.echo(f'OAI {name}: set last run: {lastrun_date}')
×
132
        return lastrun_date
1✔
133
    except InvenioOAIHarvesterConfigNotFound:
1✔
134
        if verbose:
1✔
135
            click.echo(f'ERROR OAI config not found: {name}')
1✔
136
    except ValueError as err:
1✔
137
        if verbose:
1✔
138
            click.echo(f'OAI set lastrun {name}: {err}')
1✔
139
    return None
1✔
140

141

142
class MyOAIItemIterator(OAIItemIterator):
1✔
143
    """OAI item iterator with accessToken."""
144

145
    def next_resumption_token_and_items(self):
1✔
146
        """Get next resumtion token and items."""
147
        self.resumption_token = self._get_resumption_token()
1✔
148
        self._items = self.oai_response.xml.iterfind(
1✔
149
            f'.//{self.sickle.oai_namespace}{self.element}')
150

151
    def _next_response(self):
1✔
152
        """Get the next response from the OAI server."""
153
        params = self.params
1✔
154
        access_token = params.get('accessToken')
1✔
155
        if self.resumption_token:
1✔
156
            params = {
×
157
                'resumptionToken': self.resumption_token.token,
158
                'verb': self.verb
159
            }
160
        if access_token:
1✔
161
            params['accessToken'] = access_token
×
162

163
        count = 0
1✔
164
        while count < 5:
1✔
165
            try:
1✔
166
                self.oai_response = self.sickle.harvest(**params)
1✔
167
                xml = self.oai_response.xml
1✔
168
                count = 5
1✔
169
            except Exception as err:
×
170
                count += 1
×
171
                current_app.logger.error(f'Sickle harvest {count} {err}')
×
172
                sleep(60)
×
173
        error = self.oai_response.xml.find(
1✔
174
            f'.//{self.sickle.oai_namespace}error')
175
        if error is not None:
1✔
176
            code = error.attrib.get('code', 'UNKNOWN')
×
177
            description = error.text or ''
×
178
            try:
×
179
                raise getattr(
×
180
                    oaiexceptions, code[0].upper() + code[1:])(description)
181
            except AttributeError:
×
182
                raise oaiexceptions.OAIError(description)
×
183
        if self.resumption_token:
1✔
184
            # Test we got a complete response ('resumptionToken' in xml)
185
            resumption_token_element = self.oai_response.xml.find(
×
186
                f'.//{self.sickle.oai_namespace}resumptionToken')
187

188
            if resumption_token_element is None:
×
189
                current_app.logger.error(
×
190
                    f'ERROR HARVESTING incomplete response: '
191
                    f'{self.resumption_token.cursor} '
192
                    f'{self.resumption_token.token}'
193
                )
194
                sleep(60)
×
195
            else:
196
                self.next_resumption_token_and_items()
×
197
        else:
198
            # first time
199
            self.next_resumption_token_and_items()
1✔
200

201

202
def oai_process_records_from_dates(name, sickle, oai_item_iterator,
1✔
203
                                   transformation, record_class, max_retries=0,
204
                                   access_token=None, days_span=30,
205
                                   from_date=None, until_date=None,
206
                                   ignore_deleted=False, dbcommit=True,
207
                                   reindex=True, test_md5=True,
208
                                   verbose=False, debug=False, **kwargs):
209
    """Harvest multiple records from an OAI repo.
210

211
    :param name: The name of the OAIHarvestConfig to use instead of passing
212
                 specific parameters.
213
    :param from_date: The lower bound date for the harvesting (optional).
214
    :param until_date: The upper bound date for the harvesting (optional).
215
    """
216
    from rero_mef.api import Action
1✔
217

218
    # data on IDREF Servers starts on 2000-10-01
219
    url, metadata_prefix, last_run, setspecs = get_info_by_oai_name(name)
1✔
220

221
    request = sickle(url, iterator=oai_item_iterator, max_retries=max_retries)
1✔
222

223
    dates_inital = {
1✔
224
        'from': from_date or last_run,
225
        'until': until_date or datetime.now().isoformat()
226
    }
227
    update_last_run = from_date is None and until_date is None
1✔
228
    # Sanity check
229
    if dates_inital['until'] is not None \
1✔
230
            and dates_inital['from'] > dates_inital['until']:
231
        raise WrongDateCombination("'Until' date larger than 'from' date.")
×
232

233
    # If we don't have specifications for set searches the setspecs will be
234
    # set to e list with None to go into the retrieval loop without
235
    # a set definition (line 177)
236
    setspecs = setspecs.split() or [None]
1✔
237
    count = 0
1✔
238
    action_count = {}
1✔
239
    mef_action_count = {}
1✔
240
    for spec in setspecs:
1✔
241
        dates = dates_inital
1✔
242
        params = {
1✔
243
            'metadataPrefix': metadata_prefix,
244
            'ignore_deleted': ignore_deleted
245
        }
246
        if access_token:
1✔
247
            params['accessToken'] = access_token
×
248
        if spec:
1✔
249
            params['set'] = spec
1✔
250

251
        from_date = parser.isoparse(dates_inital['from'])
1✔
252
        real_until_date = parser.isoparse(dates_inital['until'])
1✔
253
        while from_date <= real_until_date:
1✔
254
            until_date = from_date + timedelta(days=days_span)
1✔
255
            until_date = min(until_date, real_until_date)
1✔
256
            dates = {
1✔
257
                'from': from_date.strftime(TIME_FORMAT),
258
                'until': until_date.strftime(TIME_FORMAT)
259
            }
260
            params |= dates
1✔
261

262
            try:
1✔
263
                for idx, record in enumerate(request.ListRecords(**params), 1):
1✔
264
                    records = parse_xml_to_array(StringIO(record.raw))
1✔
265
                    try:
1✔
266
                        try:
1✔
267
                            updated = datetime.strptime(
1✔
268
                                records[0]['005'].data,
269
                                '%Y%m%d%H%M%S.%f'
270
                            )
271
                        except Exception as err:
×
272
                            updated = '????'
×
273
                        if rec := transformation(
1✔
274
                                records[0], logger=current_app.logger).json:
275
                            if msg := rec.get('NO TRANSFORMATION'):
1✔
276
                                if verbose:
×
277
                                    pid = rec.get('pid', '???')
×
278
                                    click.secho(
×
279
                                        f'NO TRANSFORMATION '
280
                                        f'{name} {idx} {pid}: {msg}',
281
                                        fg='yellow'
282
                                    )
283
                            else:
284
                                pid = rec.get('pid')
1✔
285
                                record, action = record_class.create_or_update(
1✔
286
                                    data=rec,
287
                                    dbcommit=True,
288
                                    reindex=True,
289
                                    test_md5=test_md5
290
                                )
291
                                count += 1
1✔
292
                                action_count.setdefault(action.name, 0)
1✔
293
                                action_count[action.name] += 1
1✔
294
                                if action in [
1✔
295
                                    Action.CREATE,
296
                                    Action.UPDATE,
297
                                    Action.REPLACE
298
                                ]:
299
                                    m_record, m_action = \
1✔
300
                                        record.create_or_update_mef(
301
                                            dbcommit=True, reindex=True)
302
                                else:
303
                                    m_action = Action.UPTODATE
1✔
304
                                    m_record = None
1✔
305
                                mef_action_count.setdefault(m_action.name, 0)
1✔
306
                                mef_action_count[m_action.name] += 1
1✔
307

308
                                if verbose:
1✔
309
                                    msg = (
1✔
310
                                        f'OAI {name} spec({spec}): {pid}'
311
                                        f' updated: {updated} {action.name}'
312
                                    )
313
                                    if m_record:
1✔
314
                                        msg = (
1✔
315
                                            f'{msg} | mef: {m_record.pid} '
316
                                            f'{m_action.name}'
317
                                        )
318
                                        if viaf_pid := m_record.get(
1✔
319
                                                'viaf_pid'):
320
                                            msg = f'{msg} | viaf: {viaf_pid}'
×
321
                                    click.echo(msg)
1✔
322
                        elif verbose:
×
323
                            click.echo(
×
324
                                f'NO TRANSFORMATION: {name} {idx}'
325
                                f'\n{records[0]}'
326
                            )
327
                    except Exception as err:
×
328
                        msg = f'Creating {name} {idx}: {err}'
×
329
                        if rec:
×
330
                            msg += f'\n{rec}'
×
331
                        current_app.logger.error(msg)
×
332
                        if debug:
×
333
                            traceback.print_exc()
×
334
            except NoRecordsMatch:
×
335
                from_date = from_date + timedelta(days=days_span + 1)
×
336
                continue
×
337
            except Exception as err:
×
338
                current_app.logger.error(err)
×
339
                if debug:
×
340
                    traceback.print_exc()
×
341
                count = -1
×
342
                if verbose:
×
343
                    click.echo(
×
344
                        f'OAI {name} {spec}: '
345
                        f'{from_date.strftime(TIME_FORMAT)} .. '
346
                        f'{until_date.strftime(TIME_FORMAT)}'
347
                    )
348
            from_date = from_date + timedelta(days=days_span + 1)
1✔
349

350
    if update_last_run:
1✔
351
        last_run = dates_inital['until']
1✔
352
        if verbose:
1✔
353
            click.echo(f'OAI {name}: update last run: {last_run}')
1✔
354
        oai_source = get_oaiharvest_object(name)
1✔
355
        oai_source.update_lastrun(parser.isoparse(last_run))
1✔
356
        oai_source.save()
1✔
357
        db.session.commit()
1✔
358
    return count, action_count, mef_action_count
1✔
359

360

361
def oai_save_records_from_dates(name, file_name, sickle, oai_item_iterator,
1✔
362
                                max_retries=0,
363
                                access_token=None, days_span=30,
364
                                from_date=None, until_date=None,
365
                                verbose=False, **kwargs):
366
    """Harvest and save multiple records from an OAI repo.
367

368
    :param name: The name of the OAIHarvestConfig to use instead of passing
369
                 specific parameters.
370
    :param from_date: The lower bound date for the harvesting (optional).
371
    :param until_date: The upper bound date for the harvesting (optional).
372
    """
373
    url, metadata_prefix, last_run, setspecs = get_info_by_oai_name(name)
1✔
374

375
    request = sickle(url, iterator=oai_item_iterator, max_retries=max_retries)
1✔
376

377
    dates_inital = {
1✔
378
        'from': from_date or last_run,
379
        'until': until_date or datetime.now().isoformat()
380
    }
381
    # Sanity check
382
    if dates_inital['from'] > dates_inital['until']:
1✔
383
        raise WrongDateCombination("'Until' date larger than 'from' date.")
×
384

385
    # If we don't have specifications for set searches the setspecs will be
386
    # set to e list with None to go into the retrieval loop without
387
    # a set definition (line 177)
388
    setspecs = setspecs.split() or [None]
1✔
389
    count = 0
1✔
390
    with open(file_name, 'bw') as output_file:
1✔
391
        for spec in setspecs:
1✔
392
            params = {
1✔
393
                'metadataPrefix': metadata_prefix,
394
                'ignore_deleted': False
395
            }
396
            if access_token:
1✔
397
                params['accessToken'] = access_token
×
398
            if spec:
1✔
399
                params['set'] = spec
1✔
400

401
            from_date = parser.isoparse(dates_inital['from'])
1✔
402
            real_until_date = parser.isoparse(dates_inital['until'])
1✔
403
            while from_date <= real_until_date:
1✔
404
                until_date = from_date + timedelta(days=days_span)
1✔
405
                if until_date > real_until_date:
1✔
406
                    until_date = real_until_date
1✔
407
                dates = {
1✔
408
                    'from': from_date.strftime(TIME_FORMAT),
409
                    'until': until_date.strftime(TIME_FORMAT)
410
                }
411
                params |= dates
1✔
412
                try:
1✔
413
                    for record in request.ListRecords(**params):
1✔
414
                        count += 1
1✔
415
                        records = parse_xml_to_array(StringIO(record.raw))
1✔
416
                        rec = records[0]
1✔
417
                        if verbose:
1✔
418
                            click.echo(
×
419
                                f'OAI {name} spec({spec}): '
420
                                f'{from_date.strftime(TIME_FORMAT)} '
421
                                f'count:{count:>10} = {rec["001"].data}'
422
                            )
423
                        rec.leader = f'{rec.leader[:9]}a{rec.leader[10:]}'
1✔
424
                        output_file.write(rec.as_marc())
1✔
425
                except NoRecordsMatch:
×
426
                    from_date = from_date + timedelta(days=days_span + 1)
×
427
                    continue
×
428
                except Exception as err:
×
429
                    current_app.logger.error(err)
×
430
                if verbose:
1✔
431
                    click.echo(
×
432
                        f'OAI {name} {spec}: '
433
                        f'{from_date.strftime(TIME_FORMAT)} .. '
434
                        f'{until_date.strftime(TIME_FORMAT)}'
435
                    )
436
                from_date = from_date + timedelta(days=days_span + 1)
1✔
437
    if verbose:
1✔
438
        click.echo(f'OAI {name}: {count}')
×
439
    return count
1✔
440

441

442
def oai_get_record(id, name, transformation, access_token=None,
1✔
443
                   identifier=None, debug=False, **kwargs):
444
    """Get record from an OAI repo.
445

446
    :param identifier: identifier of record.
447
    """
448
    url, metadata_prefix, lastrun, setspecs = get_info_by_oai_name(name)
1✔
449

450
    request = Sickle(
1✔
451
        endpoint=url,
452
        max_retries=5,
453
        default_retry_after=10,
454
        retry_status_codes=[423, 503]
455
    )
456

457
    params = {
1✔
458
        'metadataPrefix': metadata_prefix,
459
        'identifier': f'{identifier}{id}'
460
    }
461
    full_url = f'{url}?verb=GetRecord&metadataPrefix={metadata_prefix}'
1✔
462
    full_url = f'{full_url}&identifier={identifier}{id}'
1✔
463

464
    if access_token:
1✔
465
        params['accessToken'] = access_token
×
466
        full_url = f'{full_url}&accessToken={access_token}'
×
467

468
    try:
1✔
469
        record = request.GetRecord(**params)
1✔
470
        msg = f'OAI-{name:<12} get: {id:<15} {full_url} | OK'
×
471
    except Exception as err:
1✔
472
        msg = f'OAI-{name:<12} get: {id:<15} {full_url} | NO RECORD'
1✔
473
        if debug:
1✔
474
            raise
×
475
        return None, msg
1✔
476
    records = parse_xml_to_array(StringIO(record.raw))
×
477
    if debug:
×
478
        from rero_mef.marctojson.helper import display_record
×
479
        display_record(records[0])
×
480
    trans_record = transformation(records[0], logger=current_app.logger).json
×
481
    return trans_record, msg
×
482

483

484
def read_json_record(json_file, buf_size=1024, decoder=JSONDecoder()):
1✔
485
    """Read lasy JSON records from file.
486

487
    :param json_file: JSON file handle
488
    :param buf_size: buffer size for file read
489
    :param decoder: decoder to use for decoding
490
    :return: record Generator
491
    """
492
    buffer = json_file.read(2).replace('\n', '')
1✔
493
    # we have to delete the first [ for an list of records
494
    if buffer.startswith('['):
1✔
495
        buffer = buffer[1:].lstrip()
1✔
496
    while True:
497
        block = json_file.read(buf_size)
1✔
498
        if not block:
1✔
499
            break
1✔
500
        buffer += block.replace('\n', '')
1✔
501
        pos = 0
1✔
502
        while True:
503
            try:
1✔
504
                buffer = buffer.lstrip()
1✔
505
                obj, pos = decoder.raw_decode(buffer)
1✔
506
            except JSONDecodeError:
1✔
507
                break
1✔
508
            else:
509
                yield obj
1✔
510
                buffer = buffer[pos:].lstrip()
1✔
511

512
                if len(buffer) <= 0:
1✔
513
                    # buffer is empty read more data
514
                    buffer = json_file.read(buf_size)
×
515
                if buffer.startswith(','):
1✔
516
                    # delete records deliminators
517
                    buffer = buffer[1:].lstrip()
1✔
518

519

520
def export_json_records(pids, pid_type, output_file_name, indent=2,
1✔
521
                        schema=True, verbose=False):
522
    """Writes records from record_class to file.
523

524
    :param pids: pids to use
525
    :param pid_type: pid_type to use
526
    :param output_file_name: file name to write to
527
    :param indent: indent to use in output file
528
    :param schema: do not delete $schema
529
    :param verbose: verbose print
530
    :returns: count of records written
531
    """
532
    record_class = get_entity_class(pid_type)
1✔
533
    count = 0
1✔
534
    outfile = JsonWriter(output_file_name, indent=indent)
1✔
535
    for pid in pids:
1✔
536
        try:
1✔
537
            rec = record_class.get_record_by_pid(pid)
1✔
538
            count += 1
1✔
539
            if verbose:
1✔
540
                click.echo(
×
541
                    f'{count: <8} {pid_type} export {rec.pid}:{rec.id}'
542
                )
543
            if not schema:
1✔
544
                rec.pop('$schema', None)
1✔
545
                for source in ('idref', 'gnd', 'rero'):
1✔
546
                    if source in rec:
1✔
547
                        rec[source].pop('$schema', None)
1✔
548
            outfile.write(rec)
1✔
549
        except Exception as err:
×
550
            click.echo(err)
×
551
            click.echo(f'ERROR: Can not export pid:{pid}')
×
552

553

554
def number_records_in_file(json_file, file_type):
1✔
555
    """Get number of records per file."""
556
    count = 0
1✔
557
    with open(json_file, 'r',  buffering=1) as file:
1✔
558
        for line in file:
1✔
559
            if file_type == 'json' and '"pid"' in line or file_type == 'csv':
1✔
560
                count += 1
1✔
561
    return count
1✔
562

563

564
def progressbar(items, length=0, verbose=False):
1✔
565
    """Verbose progress bar."""
566
    if verbose:
1✔
567
        with click.progressbar(
×
568
                    items, label=str(length), length=length
569
                ) as progressbar_items:
570
            yield from progressbar_items
×
571
    else:
572
        yield from items
1✔
573

574

575
def get_host():
1✔
576
    """Get the host from the config."""
577
    # from flask import current_app
578
    # with current_app.app_context():
579
    #     return current_app.config.get('JSONSCHEMAS_HOST')
580
    return 'mef.rero.ch'
1✔
581

582

583
def resolve_record(path, object_class):
1✔
584
    """Resolve local records.
585

586
    :param path: pid for record
587
    :object_class: record class to use
588
    :returns: record for pid or {}
589
    """
590
    try:
1✔
591
        return object_class.get_record_by_pid(path)
1✔
592
    except PIDDoesNotExistError:
×
593
        return {}
×
594

595

596
def metadata_csv_line(record, record_uuid, date):
1✔
597
    """Build CSV metadata table line."""
598
    created_date = updated_date = date
1✔
599
    sep = '\t'
1✔
600
    metadata = (
1✔
601
        created_date,
602
        updated_date,
603
        record_uuid,
604
        json.dumps(record).replace('\\', '\\\\'),
605
        '1',
606
    )
607
    metadata_line = sep.join(metadata)
1✔
608
    return metadata_line + os.linesep
1✔
609

610

611
def pidstore_csv_line(agent, agent_pid, record_uuid, date):
1✔
612
    """Build CSV pidstore table line."""
613
    created_date = updated_date = date
1✔
614
    sep = '\t'
1✔
615
    pidstore_data = [
1✔
616
        created_date,
617
        updated_date,
618
        agent,
619
        agent_pid,
620
        'R',
621
        'rec',
622
        record_uuid,
623
    ]
624
    pidstore_line = sep.join(pidstore_data)
1✔
625
    return pidstore_line + os.linesep
1✔
626

627

628
def raw_connection():
1✔
629
    """Return a raw connection to the database."""
630
    with current_app.app_context():
×
631
        URI = current_app.config.get('SQLALCHEMY_DATABASE_URI')
×
632
        engine = sqlalchemy.create_engine(URI)
×
633
        # conn = engine.connect()
634
        connection = engine.raw_connection()
×
635
        connection.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
×
636
        return connection
×
637

638

639
def db_copy_from(buffer, table, columns):
1✔
640
    """Copy data from file to db."""
641
    connection = raw_connection()
×
642
    cursor = connection.cursor()
×
643
    try:
×
644
        cursor.copy_from(
×
645
            file=buffer,
646
            table=table,
647
            columns=columns,
648
            sep='\t'
649
        )
650
        connection.commit()
×
651
    except psycopg2.DataError as err:
×
652
        current_app.logger.error(f'data load error: {err}')
×
653
    # cursor.execute(f'VACUUM ANALYSE {table}')
654
    # cursor.close()
655
    connection.close()
×
656

657

658
def db_copy_to(filehandle, table, columns):
1✔
659
    """Copy data from db to file."""
660
    connection = raw_connection()
×
661
    cursor = connection.cursor()
×
662
    try:
×
663
        cursor.copy_to(
×
664
            file=filehandle,
665
            table=table,
666
            columns=columns,
667
            sep='\t'
668
        )
669
        cursor.connection.commit()
×
670
    except psycopg2.DataError as err:
×
671
        current_app.logger.error(f'data load error: {err}')
×
672
    cursor.execute(f'VACUUM ANALYSE {table}')
×
673
    cursor.close()
×
674
    connection.close()
×
675

676

677
def bulk_index(agent, uuids, verbose=False):
1✔
678
    """Bulk index records."""
679
    if verbose:
×
680
        click.echo(f' add to index: {len(uuids)}')
×
681
    retry = True
×
682
    minutes = 1
×
683
    from .api import ReroIndexer
×
684
    while retry:
×
685
        try:
×
686
            ReroIndexer().bulk_index(uuids, doc_type=agent)
×
687
            retry = False
×
688
        except Exception as exc:
×
689
            msg = f'Bulk Index Error: retry in {minutes} min {exc}'
×
690
            current_app.logger.error(msg)
×
691
            if verbose:
×
692
                click.secho(msg, fg='red')
×
693
            sleep(minutes * 60)
×
694
            retry = True
×
695
            minutes *= 2
×
696

697

698
def bulk_load_agent(agent, data, table, columns, bulk_count=0, verbose=False,
1✔
699
                    reindex=False):
700
    """Bulk load agent data to table."""
701
    if bulk_count <= 0:
×
702
        bulk_count = current_app.config.get('BULK_CHUNK_COUNT', 100000)
×
703
    count = 0
×
704
    buffer = StringIO()
×
705
    buffer_uuid = []
×
706
    index = columns.index('id') if 'id' in columns else -1
×
707
    start_time = datetime.now(timezone.utc)
×
708
    with open(data, 'r', encoding='utf-8', buffering=1) as input_file:
×
709
        for line in input_file:
×
710
            count += 1
×
711
            buffer.write(line)
×
712
            if index >= 0 and reindex:
×
713
                buffer_uuid.append(line.split('\t')[index])
×
714
            if count % bulk_count == 0:
×
715
                buffer.flush()
×
716
                buffer.seek(0)
×
717
                if verbose:
×
718
                    end_time = datetime.now(timezone.utc)
×
719
                    diff_time = end_time - start_time
×
720
                    start_time = end_time
×
721
                    click.echo(
×
722
                        f'{agent} copy from file: '
723
                        f'{count} {diff_time.seconds}s',
724
                        nl=False
725
                    )
726
                db_copy_from(buffer=buffer, table=table, columns=columns)
×
727
                buffer.close()
×
728

729
                if index >= 0 and reindex:
×
730
                    bulk_index(agent=agent, uuids=buffer_uuid,
×
731
                               verbose=verbose)
732
                    buffer_uuid.clear()
×
733
                elif verbose:
×
734
                    click.echo()
×
735

736
                # force the Garbage Collector to release unreferenced memory
737
                # gc.collect()
738
                # new buffer
739
                buffer = StringIO()
×
740

741
        if verbose:
×
742
            end_time = datetime.now(timezone.utc)
×
743
            diff_time = end_time - start_time
×
744
            click.echo(
×
745
                f'{agent} copy from file: {count} {diff_time.seconds}s',
746
                nl=False
747
            )
748
        buffer.flush()
×
749
        buffer.seek(0)
×
750
        db_copy_from(buffer=buffer, table=table, columns=columns)
×
751
        buffer.close()
×
752
        if index >= 0 and reindex:
×
753
            bulk_index(agent=agent, uuids=buffer_uuid, verbose=verbose)
×
754
            buffer_uuid.clear()
×
755
        elif verbose:
×
756
            click.echo()
×
757

758
    # force the Garbage Collector to release unreferenced memory
759
    gc.collect()
×
760

761

762
def bulk_load_metadata(agent, metadata, bulk_count=0, verbose=True,
1✔
763
                       reindex=False):
764
    """Bulk load agent data to metadata table."""
765
    agent_class = get_entity_class(agent)
×
766
    table, identifier = agent_class.get_metadata_identifier_names()
×
767
    columns = (
×
768
        'created',
769
        'updated',
770
        'id',
771
        'json',
772
        'version_id'
773
    )
774
    bulk_load_agent(
×
775
        agent=agent,
776
        data=metadata,
777
        table=table,
778
        columns=columns,
779
        bulk_count=bulk_count,
780
        verbose=verbose,
781
        reindex=reindex
782
    )
783

784

785
def bulk_load_pids(agent, pidstore, bulk_count=0, verbose=True, reindex=False):
1✔
786
    """Bulk load agent data to metadata table."""
787
    table = 'pidstore_pid'
×
788
    columns = (
×
789
        'created',
790
        'updated',
791
        'pid_type',
792
        'pid_value',
793
        'status',
794
        'object_type',
795
        'object_uuid',
796
    )
797
    bulk_load_agent(
×
798
        agent=agent,
799
        data=pidstore,
800
        table=table,
801
        columns=columns,
802
        bulk_count=bulk_count,
803
        verbose=verbose,
804
        reindex=reindex
805
    )
806

807

808
def bulk_load_ids(agent, ids, bulk_count=0, verbose=True, reindex=False):
1✔
809
    """Bulk load agent data to id table."""
810
    agent_class = get_entity_class(agent)
×
811
    metadata, identifier = agent_class.get_metadata_identifier_names()
×
812
    columns = ('recid', )
×
813
    bulk_load_agent(
×
814
        agent=agent,
815
        data=ids,
816
        table=identifier,
817
        columns=columns,
818
        bulk_count=bulk_count,
819
        verbose=verbose,
820
        reindex=reindex
821
    )
822

823

824
def bulk_save_agent(agent, file_name, table, columns, verbose=False):
1✔
825
    """Bulk save agent data to file."""
826
    with open(file_name, 'w', encoding='utf-8') as output_file:
×
827
        db_copy_to(
×
828
            filehandle=output_file,
829
            table=table,
830
            columns=columns
831
        )
832

833

834
def bulk_save_metadata(agent, file_name, verbose=False):
1✔
835
    """Bulk save agent data from metadata table."""
836
    if verbose:
×
837
        click.echo(f'{agent} save to file: {file_name}')
×
838
    agent_class = get_entity_class(agent)
×
839
    metadata, identifier = agent_class.get_metadata_identifier_names()
×
840
    columns = (
×
841
        'created',
842
        'updated',
843
        'id',
844
        'json',
845
        'version_id'
846
    )
847
    bulk_save_agent(
×
848
        agent=agent,
849
        file_name=file_name,
850
        table=metadata,
851
        columns=columns,
852
        verbose=verbose
853
    )
854

855

856
def bulk_save_pids(agent, file_name, verbose=False):
1✔
857
    """Bulk save agent data from pids table."""
858
    if verbose:
×
859
        click.echo(f'{agent} save to file: {file_name}')
×
860
    table = 'pidstore_pid'
×
861
    columns = (
×
862
        'created',
863
        'updated',
864
        'pid_type',
865
        'pid_value',
866
        'status',
867
        'object_type',
868
        'object_uuid',
869
    )
870
    tmp_file_name = f'{file_name}_tmp'
×
871
    bulk_save_agent(
×
872
        agent=agent,
873
        file_name=tmp_file_name,
874
        table=table,
875
        columns=columns,
876
        verbose=verbose
877
    )
878
    # clean pid file
879
    with open(tmp_file_name, 'r') as file_in:
×
880
        with open(file_name, "w") as file_out:
×
881
            file_out.writelines(line for line in file_in if agent in line)
×
882
    os.remove(tmp_file_name)
×
883

884

885
def bulk_save_ids(agent, file_name, verbose=False):
1✔
886
    """Bulk save agent data from id table."""
887
    if verbose:
×
888
        click.echo(f'{agent} save to file: {file_name}')
×
889
    agent_class = get_entity_class(agent)
×
890
    metadata, identifier = agent_class.get_metadata_identifier_names()
×
891
    columns = ('recid', )
×
892
    bulk_save_agent(
×
893
        agent=agent,
894
        file_name=file_name,
895
        table=identifier,
896
        columns=columns,
897
        verbose=verbose
898
    )
899

900

901
def create_md5(record):
1✔
902
    """Create md5 for record."""
903
    return hashlib.md5(
1✔
904
        json.dumps(record, sort_keys=True).encode('utf-8')
905
    ).hexdigest()
906

907

908
def add_md5(record):
1✔
909
    """Add md5 to json."""
910
    schema = record.pop('$schema') if record.get('$schema') else None
1✔
911
    if record.get('md5'):
1✔
912
        record.pop('md5')
1✔
913
    record['md5'] = create_md5(record)
1✔
914
    if schema:
1✔
915
        record['$schema'] = schema
1✔
916
    return record
1✔
917

918

919
def add_schema(record, agent):
1✔
920
    """Add the $schema to the record."""
921
    with current_app.app_context():
1✔
922
        schemas = current_app.config.get('RECORDS_JSON_SCHEMA')
1✔
923
        if agent in schemas:
1✔
924
            base_url = current_app.config.get('RERO_MEF_APP_BASE_URL')
1✔
925
            endpoint = current_app.config.get('JSONSCHEMAS_ENDPOINT')
1✔
926
            schema = schemas[agent]
1✔
927
            record['$schema'] = f'{base_url}{endpoint}{schema}'
1✔
928
    return record
1✔
929

930

931
def create_csv_file(input_file, agent, pidstore, metadata):
1✔
932
    """Create agent CSV file to load."""
933
    count = 0
×
934
    with \
×
935
            open(input_file, 'r', encoding='utf-8') as agent_file, \
936
            open(metadata, 'w', encoding='utf-8') as agent_metadata_file, \
937
            open(pidstore, 'w', encoding='utf-8') as agent_pids_file:
938

939
        for record in ijson.items(agent_file, "item"):
×
940
            if agent == 'viaf':
×
941
                record['pid'] = record['viaf_pid']
×
942

943
            ordered_record = add_md5(record)
×
944
            add_schema(ordered_record, agent)
×
945

946
            record_uuid = str(uuid4())
×
947
            date = str(datetime.now(timezone.utc))
×
948

949
            agent_metadata_file.write(
×
950
                metadata_csv_line(ordered_record, record_uuid, date)
951
            )
952

953
            agent_pids_file.write(
×
954
                pidstore_csv_line(agent, record['pid'], record_uuid, date)
955
            )
956
            count += 1
×
957
    return count
×
958

959

960
def get_entity_classes(without_mef_viaf=True):
1✔
961
    """Get agent classes from config."""
962
    agents = {}
1✔
963
    endpoints = deepcopy(current_app.config.get('RECORDS_REST_ENDPOINTS', {}))
1✔
964
    if without_mef_viaf:
1✔
965
        endpoints.pop('mef', None)
1✔
966
        endpoints.pop('viaf', None)
1✔
967
        endpoints.pop('comef', None)
1✔
968
    for agent in endpoints:
1✔
969
        if record_class := obj_or_import_string(
1✔
970
                endpoints[agent].get('record_class')):
971
            agents[agent] = record_class
1✔
972
    return agents
1✔
973

974

975
def get_endpoint_class(entity, class_name):
1✔
976
    """Get entity class from config."""
977
    endpoints = current_app.config.get('RECORDS_REST_ENDPOINTS', {})
1✔
978
    if endpoint := endpoints.get(entity, {}):
1✔
979
        return obj_or_import_string(endpoint.get(class_name))
1✔
980

981

982
def get_entity_class(entity):
1✔
983
    """Get entity record class from config."""
984
    if entity := get_endpoint_class(entity=entity, class_name='record_class'):
1✔
985
        return entity
1✔
986

987

988
def get_entity_search_class(entity):
1✔
989
    """Get entity search class from config."""
990
    if search := get_endpoint_class(entity=entity, class_name='search_class'):
1✔
991
        return search
1✔
992

993

994
def get_entity_indexer_class(entity):
1✔
995
    """Get entity indexer class from config."""
996
    if search := get_endpoint_class(entity=entity, class_name='indexer_class'):
×
997
        return search
×
998

999

1000
def write_link_json(
1✔
1001
    agent,
1002
    pidstore_file,
1003
    metadata_file,
1004
    viaf_pid,
1005
    corresponding_data,
1006
    agent_pid,
1007
    verbose=False
1008
):
1009
    """Write a JSON record into file."""
1010
    key_per_catalog_id = {
1✔
1011
        'DNB': 'gnd_pid',
1012
        'RERO': 'rero_pid',
1013
        'SUDOC': 'idref_pid'
1014
    }
1015
    json_data = {'viaf_pid': viaf_pid}
1✔
1016
    write_to_file_viaf = False
1✔
1017
    for key, value in corresponding_data.items():
1✔
1018
        if key in key_per_catalog_id:
1✔
1019
            json_data[key_per_catalog_id[key]] = value
1✔
1020
            write_to_file_viaf = True
1✔
1021
    write_to_file = False
1✔
1022
    json_dump = json_data
1✔
1023

1024
    agent_pid = viaf_pid
1✔
1025
    add_schema(json_dump, 'viaf')
1✔
1026
    json_dump['pid'] = agent_pid
1✔
1027
    del json_dump['viaf_pid']
1✔
1028
    # only save VIAF data with used pids
1029
    write_to_file = write_to_file_viaf if agent == 'viaf' else True
1✔
1030
    if write_to_file:
1✔
1031
        record_uuid = str(uuid4())
1✔
1032
        date = str(datetime.now(timezone.utc))
1✔
1033
        pidstore_file.write(
1✔
1034
            pidstore_csv_line(agent, agent_pid, record_uuid, date)
1035
        )
1036
        metadata_file.write(metadata_csv_line(json_dump, record_uuid, date))
1✔
1037
        if verbose:
1✔
1038
            click.echo(f'  {agent}: {json_dump}')
×
1039
    return write_to_file
1✔
1040

1041

1042
def append_fixtures_new_identifiers(identifier, pids, pid_type):
1✔
1043
    """Insert pids into the indentifier table and update its sequence."""
1044
    with db.session.begin_nested():
×
1045
        for pid in pids:
×
1046
            db.session.add(identifier(recid=pid))
×
1047
        max_pid = PersistentIdentifier.query.filter_by(
×
1048
            pid_type=pid_type
1049
        ).order_by(sqlalchemy.desc(
1050
            sqlalchemy.cast(PersistentIdentifier.pid_value, sqlalchemy.Integer)
1051
        )).first().pid_value
1052
        identifier._set_sequence(max_pid)
×
1053

1054

1055
def set_timestamp(name, **kwargs):
1✔
1056
    """Set timestamp in current cache.
1057

1058
    Allows to timestamp functionality and monitoring of the changed
1059
    timestamps externaly via url requests.
1060

1061
    :param name: name of time stamp.
1062
    :returns: time of time stamp
1063
    """
1064
    time_stamps = current_cache.get('timestamps')
1✔
1065
    if not time_stamps:
1✔
1066
        time_stamps = {}
1✔
1067
    utc_now = datetime.now(timezone.utc)
1✔
1068
    time_stamps[name] = {}
1✔
1069
    time_stamps[name]['time'] = utc_now
1✔
1070
    for key, value in kwargs.items():
1✔
1071
        time_stamps[name][key] = value
1✔
1072
    current_cache.set('timestamps', time_stamps)
1✔
1073
    return utc_now
1✔
1074

1075

1076
def get_timestamp(name):
1✔
1077
    """Get timestamp in current cache.
1078

1079
    :param name: name of time stamp.
1080
    :returns: time of time stamp
1081
    """
1082
    time_stamps = current_cache.get('timestamps')
1✔
1083
    return time_stamps.get(name) if time_stamps else None
1✔
1084

1085

1086
class JsonWriter(object):
1✔
1087
    """Json Writer."""
1088

1089
    count = 0
1✔
1090

1091
    def __init__(self, filename, indent=2):
1✔
1092
        """Constructor.
1093

1094
        :params filename: File name of the file to be written.
1095
        :param indent: indentation.
1096
        """
1097
        self.indent = indent
1✔
1098
        self.file_handle = open(filename, 'w')
1✔
1099
        self.file_handle.write('[')
1✔
1100

1101
    def __del__(self):
1✔
1102
        """Destructor."""
1103
        if self.file_handle:
1✔
1104
            self.file_handle.write('\n]')
1✔
1105
            self.file_handle.close()
1✔
1106
            self.file_handle = None
1✔
1107

1108
    def __enter__(self):
1✔
1109
        """Context manager enter."""
1110
        return self
1✔
1111

1112
    def __exit__(self, exception_type, exception_value, exception_traceback):
1✔
1113
        """Context manager exit.
1114

1115
        :params exception_type: indicates class of exception.
1116
        :params exception_value: indicates type of exception.
1117
            like divide_by_zero error, floating_point_error,
1118
            which are types of arithmetic exception.
1119
        :params exception_traceback: traceback is a report which has all
1120
            of the information needed to solve the exception.
1121
        """
1122
        self.__del__()
1✔
1123

1124
    def write(self, data):
1✔
1125
        """Write data to file.
1126

1127
        :param data: JSON data to write into the file.
1128
        """
1129
        if self.count > 0:
1✔
1130
            self.file_handle.write(',')
1✔
1131
        if self.indent:
1✔
1132
            for line in dumps(data, indent=self.indent).split('\n'):
1✔
1133
                self.file_handle.write(f'\n{" ".ljust(self.indent)}')
1✔
1134
                self.file_handle.write(line)
1✔
1135
        else:
1136
            self.file_handle.write(dumps(data), separators=(',', ':'))
×
1137
        self.count += 1
1✔
1138

1139
    def close(self):
1✔
1140
        """Close file."""
1141
        self.__del__()
×
1142

1143

1144
def mef_get_all_missing_entity_pids(mef_class, entity, verbose=False):
1✔
1145
    """Get all missing agent pids.
1146

1147
    :param mef_class: MEF class to use.
1148
    :param entity: entity name to get the missing pids.
1149
    :param verbose: Verbose.
1150
    :returns: Missing VIAF pids.
1151
    """
1152
    record_class = get_entity_class(entity)
×
1153
    non_existing_pids = {}
×
1154
    no_pids = []
×
1155
    if verbose:
×
1156
        click.echo(f'Get pids from {entity} ...')
×
1157
    progress = progressbar(
×
1158
        items=record_class.get_all_pids(),
1159
        length=record_class.count(),
1160
        verbose=verbose
1161
    )
1162
    missing_pids = {pid: 1 for pid in progress}
×
1163
    name = record_class.name
×
1164
    if verbose:
×
1165
        click.echo(f'Get pids for {name} from MEF and calculate missing ...')
×
1166
    query = mef_class.search().filter('exists', field=name)
×
1167
    progress = progressbar(
×
1168
        items=query.source(['pid', name]).scan(),
1169
        length=query.count(),
1170
        verbose=True
1171
    )
1172
    for hit in progress:
×
1173
        data = hit.to_dict()
×
1174
        if agent_pid := data.get(name, {}).get('pid'):
×
1175
            res = missing_pids.pop(agent_pid, False)
×
1176
            if not res:
×
1177
                non_existing_pids[hit.pid] = agent_pid
×
1178
        else:
1179
            no_pids.append(hit.pid)
×
1180
    return list(missing_pids), non_existing_pids, no_pids
×
1181

1182

1183
def get_mefs_endpoints():
1✔
1184
    """Get all enpoints for MEF's."""
1185
    from .agents import AgentMefRecord
1✔
1186
    from .agents.utils import get_agent_endpoints
1✔
1187
    from .concepts import ConceptMefRecord
1✔
1188
    from .concepts.utils import get_concept_endpoints
1✔
1189

1190
    mefs = [{
1✔
1191
        'mef_class': AgentMefRecord,
1192
        'endpoints': get_agent_endpoints()
1193
    }]
1194
    mefs.append({
1✔
1195
        'mef_class': ConceptMefRecord,
1196
        'endpoints': get_concept_endpoints()
1197
    })
1198
    return mefs
1✔
1199

1200

1201
def generate(search, deleted):
1✔
1202
    """Lagging genarator."""
1203
    yield '['
1✔
1204
    idx = 0
1✔
1205
    for hit in search.scan():
1✔
1206
        if idx != 0:
1✔
1207
            yield ', '
1✔
1208
        yield json.dumps(hit.to_dict())
1✔
1209
        idx += 1
1✔
1210
    for idx_deleted, record in enumerate(deleted):
1✔
1211
        if idx + idx_deleted != 0:
1✔
1212
            yield ', '
1✔
1213
        yield json.dumps(record)
1✔
1214
    yield ']'
1✔
1215

1216

1217
def requests_retry_session(retries=5, backoff_factor=0.5,
1✔
1218
                           status_forcelist=(500, 502, 504), session=None):
1219
    """Request retry session.
1220

1221
    :params retries: The total number of retry attempts to make.
1222
    :params backoff_factor: Sleep between failed requests.
1223
        {backoff factor} * (2 ** ({number of total retries} - 1))
1224
    :params status_forcelist: The HTTP response codes to retry on..
1225
    :params session: Session to use.
1226
    :returns: http request session.
1227

1228
    """
1229
    session = session or requests.Session()
1✔
1230
    retry = Retry(
1✔
1231
        total=retries,
1232
        read=retries,
1233
        connect=retries,
1234
        backoff_factor=backoff_factor,
1235
        status_forcelist=status_forcelist,
1236
    )
1237
    adapter = HTTPAdapter(max_retries=retry)
1✔
1238
    session.mount('http://', adapter)
1✔
1239
    session.mount('https://', adapter)
1✔
1240
    return session
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc