• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

sul-dlss / libsys-airflow / 10427186039

16 Aug 2024 10:45PM UTC coverage: 83.867% (-0.5%) from 84.325%
10427186039

Pull #1140

github

web-flow
Merge 0afbbe571 into 331037d9f
Pull Request #1140: Error report and email for OCLC errors with new MARC records

54 of 106 new or added lines in 4 files covered. (50.94%)

8 existing lines in 2 files now uncovered.

4273 of 5095 relevant lines covered (83.87%)

0.84 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.33
/libsys_airflow/plugins/data_exports/oclc_api.py
1
import copy
1✔
2
import json
1✔
3
import logging
1✔
4
import pathlib
1✔
5
import re
1✔
6

7
import httpx
1✔
8
import pymarc
1✔
9

10
from typing import List, Union, Callable
1✔
11

12
from bookops_worldcat import WorldcatAccessToken, MetadataSession
1✔
13
from bookops_worldcat.errors import WorldcatRequestError
1✔
14

15
from libsys_airflow.plugins.data_exports.marc.oclc import get_record_id
1✔
16
from libsys_airflow.plugins.data_exports.marc.excluded_tags import oclc_excluded
1✔
17

18
from libsys_airflow.plugins.shared.folio_client import folio_client
1✔
19

20
from pymarc import Record
1✔
21

22
logger = logging.getLogger(__name__)
1✔
23

24
# If we want to also match on OCoLC-I, just replace -[M] with -[MI]
25
OCLC_REGEX = re.compile(r"\(OCoLC(-[M])?\)(\w+)")
1✔
26

27

28
def get_instance_uuid(record) -> Union[str, None]:
1✔
29
    instance_uuid = None
1✔
30
    for field in record.get_fields("999"):
1✔
31
        if field.indicators == pymarc.Indicators(first="f", second="f"):
1✔
32
            instance_uuid = field["i"]
1✔
33
    if instance_uuid is None:
1✔
34
        logger.error("No instance UUID found in MARC record")
1✔
35
    return instance_uuid
1✔
36

37

38
def oclc_records_operation(**kwargs) -> dict:
1✔
39

40
    function_name: str = kwargs["oclc_function"]
1✔
41
    connection_lookup: dict = kwargs["connections"]
1✔
42

43
    type_of_records: dict = kwargs["records"]
1✔
44
    success: dict = kwargs.get("success", {})
1✔
45
    failures: dict = kwargs.get("failures", {})
1✔
46

47
    for library, records in type_of_records.items():
1✔
48
        success[library] = []
1✔
49
        failures[library] = []
1✔
50

51
        oclc_api = OCLCAPIWrapper(
1✔
52
            client_id=connection_lookup[library]["username"],
53
            secret=connection_lookup[library]["password"],
54
        )
55

56
        oclc_api_function = getattr(oclc_api, function_name)
1✔
57

58
        success[library] = []
1✔
59
        failures[library] = []
1✔
60
        archive_files = []
1✔
61
        if len(records) > 0:
1✔
62
            oclc_result = oclc_api_function(records)
1✔
63
            success[library].extend(oclc_result['success'])
1✔
64
            failures[library].extend(oclc_result['failures'])
1✔
65
            archive_files.extend(oclc_result['archive'])
1✔
66
            logger.info(
1✔
67
                f"Processed {function_name} for {library} successful {len(success[library])} failures {len(failures[library])}"
68
            )
69
        else:
70
            logger.info(f"No {function_name} records for {library}")
1✔
71

72
    return {"success": success, "failures": failures, "archive": archive_files}
1✔
73

74

75
class OCLCAPIWrapper(object):
1✔
76
    # Helper class for transmitting MARC records to OCLC Worldcat API
77

78
    def __init__(self, **kwargs):
1✔
79
        self.oclc_token = None
1✔
80
        client_id = kwargs["client_id"]
1✔
81
        secret = kwargs["secret"]
1✔
82
        self.httpx_client = httpx.Client()
1✔
83
        self.__authenticate__(client_id, secret)
1✔
84
        self.folio_client = folio_client()
1✔
85

86
    def __authenticate__(self, client_key, secret) -> None:
1✔
87
        try:
1✔
88
            self.oclc_token = WorldcatAccessToken(
1✔
89
                key=client_key, secret=secret, scopes="WorldCatMetadataAPI"
90
            )
91
        except Exception as e:
1✔
92
            msg = "Unable to Retrieve Worldcat Access Token"
1✔
93
            logger.error(msg)
1✔
94
            raise Exception(msg, e)
1✔
95

96
    def __get_srs_record_id__(self, instance_uuid: str) -> Union[str, None]:
1✔
97
        source_storage_result = self.folio_client.folio_get(
1✔
98
            f"/source-storage/source-records?instanceId={instance_uuid}"
99
        )
100

101
        try:
1✔
102
            source_records = source_storage_result['sourceRecords']
1✔
103
            if len(source_records) < 1:
1✔
104
                logger.error(f"No Active SRS record found for {instance_uuid}")
1✔
105
                return None
1✔
106
            return source_records[0]['recordId']
1✔
107

108
        except Exception as e:
1✔
109
            logger.error(
1✔
110
                f"Failed to retrieve Active SRS record id for Instance {instance_uuid} error: {e}"
111
            )
112
            return None
1✔
113

114
    def __instance_info__(self, instance_uuid: str) -> tuple:
1✔
115
        instance = self.folio_client.folio_get(f"/inventory/instances/{instance_uuid}")
1✔
116
        version = instance["_version"]
1✔
117
        hrid = instance["hrid"]
1✔
118
        return version, hrid
1✔
119

120
    def __put_folio_record__(self, instance_uuid: str, record: Record) -> bool:
1✔
121
        """
122
        Updates FOLIO SRS with updated MARC record with new OCLC Number
123
        in the 035 field
124
        """
125
        marc_json = record.as_json()
1✔
126
        version, instance_hrid = self.__instance_info__(instance_uuid)
1✔
127
        srs_uuid = self.__get_srs_record_id__(instance_uuid)
1✔
128
        if srs_uuid is None:
1✔
129
            logger.error(
1✔
130
                f"Failed to retrieve Active SRS uuid for Instance {instance_uuid}"
131
            )
132
            return False
1✔
133
        put_result = self.httpx_client.put(
1✔
134
            f"{self.folio_client.okapi_url}/change-manager/parsedRecords/{srs_uuid}",
135
            headers=self.folio_client.okapi_headers,
136
            json={
137
                "id": srs_uuid,
138
                "recordType": "MARC_BIB",
139
                "relatedRecordVersion": version,
140
                "parsedRecord": {"content": marc_json},
141
                "externalIdsHolder": {
142
                    "instanceId": instance_uuid,
143
                    "instanceHrid": instance_hrid,
144
                },
145
            },
146
        )
147
        if put_result.status_code != 202:
1✔
148
            logger.error(
1✔
149
                f"Failed to update FOLIO for Instance {instance_uuid} with SRS {srs_uuid}"
150
            )
151
            return False
1✔
152
        return True
1✔
153

154
    def __read_marc_files__(self, marc_files: list) -> list:
1✔
155
        records = []
1✔
156
        for marc_file in marc_files:
1✔
157
            marc_file_path = pathlib.Path(marc_file)
1✔
158
            if marc_file_path.exists():
1✔
159
                with marc_file_path.open('rb') as fo:
1✔
160
                    marc_reader = pymarc.MARCReader(fo)
1✔
161
                    records.extend([(r, str(marc_file_path)) for r in marc_reader])
1✔
162
        return records
1✔
163

164
    def __extract_control_number_035__(
1✔
165
        self, oclc_put_result: bytes
166
    ) -> Union[str, None]:
167
        """
168
        Extracts new OCLC number from 035 record
169
        """
170
        control_number = None
1✔
171
        oclc_record = pymarc.Record(data=oclc_put_result)  # type: ignore
1✔
172
        fields_035 = oclc_record.get_fields('035')
1✔
173
        for field in fields_035:
1✔
174
            for subfield in field.get_subfields("a"):
1✔
175
                matched_oclc = OCLC_REGEX.match(subfield)
1✔
176
                if matched_oclc:
1✔
177
                    _, control_number = matched_oclc.groups()
1✔
178
                    break
1✔
179
            if control_number:
1✔
180
                break
1✔
181
        return control_number
1✔
182

183
    def __update_oclc_number__(self, control_number: str, record: Record) -> Record:
1✔
184
        """
185
        Updates 035 field if control_number has changed or adds new 035 field if control_number
186
        is not found
187
        """
188
        needs_new_035 = True
1✔
189
        for field in record.get_fields('035'):
1✔
190
            for i, subfield in enumerate(field.subfields):
1✔
191
                if subfield.code == "a":
1✔
192
                    matched_oclc = OCLC_REGEX.match(subfield.value)
1✔
193
                    if matched_oclc:
1✔
194
                        suffix, oclc_number = matched_oclc.groups()
1✔
195
                        # Test if control number already exists
196
                        if oclc_number == control_number:
1✔
197
                            if suffix is None:
1✔
198
                                # Change prefix to include -M
199
                                new_prefix = subfield.value.replace(
1✔
200
                                    "(OCoLC)", "(OCoLC-M)"
201
                                )
202
                                field.subfields.pop(i)
1✔
203
                                field.add_subfield(code="a", value=new_prefix, pos=i)
1✔
204
                            needs_new_035 = False
1✔
205
                            break
1✔
206
        if needs_new_035:
1✔
207
            new_035 = pymarc.Field(
1✔
208
                tag='035',
209
                indicators=[' ', ' '],  # type: ignore
210
                subfields=[
211
                    pymarc.Subfield(code='a', value=f"(OCoLC-M){control_number}")
212
                ],
213
            )
214
            record.add_ordered_field(new_035)
1✔
215
        return record
1✔
216

217
    def __oclc_operations__(self, **kwargs) -> dict:
1✔
218
        marc_files: List[str] = kwargs['marc_files']
1✔
219
        function: Callable = kwargs['function']
1✔
220
        no_recs_message: str = kwargs.get("no_recs_message", "")
1✔
221
        output: dict = {"success": [], "failures": []}
1✔
222

223
        if len(marc_files) < 1:
1✔
224
            logger.info(no_recs_message)
1✔
225
            return output
1✔
226

227
        marc_records = self.__read_marc_files__(marc_files)
1✔
228

229
        successful_files: set = set()
1✔
230
        failed_files: set = set()
1✔
231

232
        with MetadataSession(authorization=self.oclc_token, timeout=30) as session:
1✔
233
            for record, file_name in marc_records:
1✔
234
                instance_uuid = get_instance_uuid(record)
1✔
235
                if instance_uuid is None:
1✔
236
                    continue
1✔
237
                try:
1✔
238
                    function(
1✔
239
                        session=session,
240
                        output=output,
241
                        record=record,
242
                        file_name=file_name,
243
                        instance_uuid=instance_uuid,
244
                        successes=successful_files,
245
                        failures=failed_files,
246
                    )
UNCOV
247
                except WorldcatRequestError as e:
×
UNCOV
248
                    msg = f"Instance UUID {instance_uuid} Error: {e}"
×
UNCOV
249
                    logger.error(msg)
×
UNCOV
250
                    output['failures'].append(
×
251
                        {
252
                            "uuid": instance_uuid,
253
                            "reason": "WorldcatRequest Error",
254
                            "context": str(e),
255
                        }
256
                    )
UNCOV
257
                    failed_files.add(file_name)
×
UNCOV
258
                    continue
×
259
        output["archive"] = list(successful_files.difference(failed_files))
1✔
260
        return output
1✔
261

262
    def __test_oclc_numbers__(self, oclc_numbers: list, instance_uuid: str):
1✔
263
        error_payload = None
1✔
264
        if len(oclc_numbers) != 1:
1✔
265
            match len(oclc_numbers):
1✔
266

267
                case 0:
1✔
268
                    msg = "Missing OCLC number"
1✔
269
                    error_payload = {
1✔
270
                        "uuid": instance_uuid,
271
                        "reason": msg,
272
                        "context": None,
273
                    }
274

275
                case _:
1✔
276
                    msg = "Multiple OCLC ids"
1✔
277
                    error_payload = {
1✔
278
                        "uuid": instance_uuid,
279
                        "reason": msg,
280
                        "context": sorted(oclc_numbers),  # type: ignore
281
                    }
282

283
            logger.error(msg)
1✔
284

285
        return error_payload
1✔
286

287
    def delete(self, marc_files: List[str]) -> dict:
1✔
288

289
        def __delete_oclc__(**kwargs):
1✔
290
            session: MetadataSession = kwargs["session"]
1✔
291
            output: dict = kwargs["output"]
1✔
292
            record: pymarc.Record = kwargs["record"]
1✔
293
            instance_uuid: str = kwargs["instance_uuid"]
1✔
294
            file_name: str = kwargs["file_name"]
1✔
295
            successes: set = kwargs["successes"]
1✔
296
            failures: set = kwargs["failures"]
1✔
297

298
            oclc_id = get_record_id(record)
1✔
299

300
            error_payload = self.__test_oclc_numbers__(oclc_id, instance_uuid)
1✔
301

302
            if error_payload:
1✔
303
                output['failures'].append(error_payload)
1✔
304
                failures.add(file_name)
1✔
305
                return
1✔
306

307
            response = session.holdings_unset(oclcNumber=oclc_id[0])
×
308
            if response:
×
309
                response = response.json()
×
310

311
            if response and response['success']:
×
312
                logger.info(f"Matched {instance_uuid} result {response.json()}")
×
313
                output['success'].append(instance_uuid)
×
314
                successes.add(file_name)
×
315
            else:
316
                msg = "Failed holdings_unset"
×
317
                logger.info(f"{msg} for {instance_uuid} OCLC response: {response}")
×
318
                output['failures'].append(
×
319
                    {"uuid": instance_uuid, "reason": msg, "context": response}
320
                )
321
                failures.add(file_name)
×
322

323
        output = self.__oclc_operations__(
1✔
324
            marc_files=marc_files,
325
            function=__delete_oclc__,
326
            no_recs_message="No marc records for deletes",
327
        )
328
        return output
1✔
329

330
    def match(self, marc_files: List[str]) -> dict:
1✔
331

332
        def __match_oclc__(**kwargs):
1✔
333
            session: MetadataSession = kwargs["session"]
1✔
334
            output: dict = kwargs["output"]
1✔
335
            record: pymarc.Record = kwargs["record"]
1✔
336
            instance_uuid: str = kwargs["instance_uuid"]
1✔
337
            file_name: str = kwargs["file_name"]
1✔
338
            failures: set = kwargs["failures"]
1✔
339
            successes: set = kwargs["successes"]
1✔
340

341
            export_record = copy.deepcopy(record)
1✔
342
            export_record.remove_fields(*oclc_excluded)
1✔
343
            marc21 = export_record.as_marc21()
1✔
344

345
            matched_record_result = session.bib_match(
1✔
346
                record=marc21,
347
                recordFormat="application/marc",
348
            )
349
            logger.info(f"Matched Record Result {matched_record_result.json()}")
1✔
350
            matched_record = matched_record_result.json()
1✔
351
            if matched_record['numberOfRecords'] < 1:
1✔
352
                output['failures'].append(
1✔
353
                    {
354
                        "uuid": instance_uuid,
355
                        "reason": "Match failed",
356
                        "context": matched_record,
357
                    }
358
                )
359
                failures.add(file_name)
1✔
360
                return
1✔
361

362
            # Use first brief record's oclcNumber to add to existing MARC
363
            # record
364
            control_number = matched_record['briefRecords'][0]['oclcNumber']
1✔
365

366
            modified_marc_record = self.__update_oclc_number__(control_number, record)
1✔
367

368
            if self.__put_folio_record__(instance_uuid, modified_marc_record):
1✔
369

370
                # Sets holdings using the OCLC number
371
                update_holding_result = session.holdings_set(oclcNumber=control_number)
1✔
372

373
                if update_holding_result is not None:
1✔
374
                    update_holding_result = update_holding_result.json()
1✔
375

376
                if update_holding_result and update_holding_result['success']:
1✔
377
                    logger.info(
1✔
378
                        f"Sets new holdings for {instance_uuid} OCLC {update_holding_result}"
379
                    )
380
                    output['success'].append(instance_uuid)
1✔
381
                    successes.add(file_name)
1✔
382
                    return
1✔
383

384
                output['failures'].append(
1✔
385
                    {
386
                        "uuid": instance_uuid,
387
                        "reason": "Failed to update holdings after match",
388
                        "context": update_holding_result,
389
                    }
390
                )
391
                failures.add(file_name)
1✔
392
            else:
393
                output['failures'].append(
1✔
394
                    {
395
                        "uuid": instance_uuid,
396
                        "reason": "FOLIO failed to Add OCLC number",
397
                        "context": control_number,
398
                    }
399
                )
400
                failures.add(file_name)
1✔
401

402
        output = self.__oclc_operations__(
1✔
403
            marc_files=marc_files,
404
            function=__match_oclc__,
405
            no_recs_message="No new marc records",
406
        )
407
        return output
1✔
408

409
    def new(self, marc_files: dict) -> dict:
1✔
410

411
        def __add_update_control_number__(**kwargs):
1✔
412
            session: MetadataSession = kwargs["session"]
1✔
413
            control_number: str = kwargs["control_number"]
1✔
414
            record: pymarc.Record = kwargs["record"]
1✔
415
            output: dict = kwargs["output"]
1✔
416
            instance_uuid: str = kwargs["instance_uuid"]
1✔
417
            file_name: str = kwargs["file_name"]
1✔
418
            failures: set = kwargs["failures"]
1✔
419

420
            modified_marc_record = self.__update_oclc_number__(control_number, record)
1✔
421
            successful_add = False
1✔
422

423
            if self.__put_folio_record__(instance_uuid, modified_marc_record):
1✔
424
                # Sets holdings using the OCLC number
425
                new_holding_result = session.holdings_set(oclcNumber=control_number)
1✔
426
                if new_holding_result:
1✔
427
                    payload = new_holding_result.json()
1✔
428
                    if payload['success']:
1✔
429
                        logger.info(
1✔
430
                            f"Sets new holdings for {instance_uuid} OCLC {payload}"
431
                        )
432
                        output['success'].append(instance_uuid)
1✔
433
                        successful_add = True
1✔
434
                    else:
435
                        output['failures'].append(
×
436
                            {
437
                                "uuid": instance_uuid,
438
                                "reason": "Failed to update holdings for new record",
439
                                "context": payload,
440
                            }
441
                        )
442
                        failures.add(file_name)
×
443
            return successful_add
1✔
444

445
        def __new_oclc__(**kwargs):
1✔
446
            session: MetadataSession = kwargs["session"]
1✔
447
            output: dict = kwargs["output"]
1✔
448
            record: pymarc.Record = kwargs["record"]
1✔
449
            instance_uuid: str = kwargs["instance_uuid"]
1✔
450
            file_name: str = kwargs["file_name"]
1✔
451
            successes: set = kwargs["successes"]
1✔
452
            failures: set = kwargs["failures"]
1✔
453

454
            export_record = copy.deepcopy(record)
1✔
455
            export_record.remove_fields(*oclc_excluded)
1✔
456

457
            marc21 = export_record.as_marc21()
1✔
458

459
            # We want to capture errors from the OCLC response instead of
460
            # trying to parse the WorldcatRequestError
461
            bib_create_result = self.httpx_client.post(
1✔
462
                session._url_manage_bibs_create(),
463
                headers={
464
                    "Accept": "application/marc",
465
                    "content-type": "application/marc",
466
                    "Authorization": f"Bearer {session.authorization.token_str}",
467
                },
468
                data=marc21,
469
            )
470

471
            logger.info(
1✔
472
                f"New record result {bib_create_result.status_code} {bib_create_result.content}"
473
            )
474
            match bib_create_result.status_code:
1✔
475

476
                case 201:
1✔
477
                    control_number = self.__extract_control_number_035__(
1✔
478
                        bib_create_result.content
479
                    )
480
                    if control_number is None:
1✔
481
                        output['failures'].append(
1✔
482
                            {
483
                                "uuid": instance_uuid,
484
                                "reason": "Failed to extract OCLC number",
485
                                "context": None,
486
                            }
487
                        )
488
                        failures.add(file_name)
1✔
489
                        return
1✔
490

491
                case _:
1✔
492
                    try:
1✔
493
                        context = bib_create_result.json()
1✔
494
                    except json.decoder.JSONDecodeError:
1✔
495
                        context = bib_create_result.text
1✔
496

497
                    output['failures'].append(
1✔
498
                        {
499
                            "uuid": instance_uuid,
500
                            "reason": "Failed to add new MARC record",
501
                            "context": context,
502
                        }
503
                    )
504
                    failures.add(file_name)
1✔
505
                    return
1✔
506

507
            if __add_update_control_number__(
1✔
508
                session=session,
509
                instance_uuid=instance_uuid,
510
                control_number=control_number,
511
                output=output,
512
                record=record,
513
                successes=successes,
514
                failures=failures,
515
                file_name=file_name,
516
            ):
517
                if instance_uuid not in output['success']:
1✔
518
                    output['success'].append(instance_uuid)
×
519
                successes.add(file_name)
1✔
520
            else:
521
                output['failures'].append(
1✔
522
                    {
523
                        "uuid": instance_uuid,
524
                        "reason": "FOLIO failed to Add OCLC number",
525
                        "context": control_number,
526
                    }
527
                )
528

529
        output = self.__oclc_operations__(
1✔
530
            marc_files=marc_files,
531
            function=__new_oclc__,
532
            no_recs_message="No new marc records",
533
        )
534
        return output
1✔
535

536
    def update(self, marc_files: List[str]):
1✔
537
        def __update_oclc__(**kwargs):
1✔
538
            session: MetadataSession = kwargs["session"]
1✔
539
            output: dict = kwargs["output"]
1✔
540
            record: pymarc.Record = kwargs["record"]
1✔
541
            instance_uuid: str = kwargs["instance_uuid"]
1✔
542
            file_name: str = kwargs["file_name"]
1✔
543
            successes: set = kwargs["successes"]
1✔
544
            failures: set = kwargs["failures"]
1✔
545

546
            oclc_id = get_record_id(record)
1✔
547
            error_payload = self.__test_oclc_numbers__(oclc_id, instance_uuid)
1✔
548
            if error_payload:
1✔
549
                output['failures'].append(error_payload)
1✔
550
                failures.add(file_name)
1✔
551
                return
1✔
552

553
            response = session.holdings_set(oclcNumber=oclc_id[0])
1✔
554

555
            if response is None:
1✔
556
                output['failures'].append(
×
557
                    {
558
                        "uuid": instance_uuid,
559
                        "reason": "No response from OCLC",
560
                        "context": None,
561
                    }
562
                )
563
                failures.add(file_name)
×
564
                return
×
565

566
            set_payload = response.json()
1✔
567
            if not set_payload['success']:
1✔
568
                output["failures"].append(
×
569
                    {
570
                        "uuid": instance_uuid,
571
                        "reason": "Failed to update holdings",
572
                        "context": set_payload,
573
                    }
574
                )
575
                failures.add(file_name)
×
576
                return
×
577

578
            control_number = set_payload['controlNumber']
1✔
579

580
            modified_marc_record = self.__update_oclc_number__(control_number, record)
1✔
581
            if self.__put_folio_record__(instance_uuid, modified_marc_record):
1✔
582
                output['success'].append(instance_uuid)
1✔
583
                successes.add(file_name)
1✔
584
            else:
585
                output['failures'].append(
×
586
                    {
587
                        "uuid": instance_uuid,
588
                        "reason": "FOLIO failed to Add OCLC number",
589
                        "context": control_number,
590
                    }
591
                )
592
                failures.add(file_name)
×
593

594
        output = self.__oclc_operations__(
1✔
595
            marc_files=marc_files,
596
            function=__update_oclc__,
597
            no_recs_message="No updated marc records",
598
        )
599
        return output
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc