• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

rero / rero-mef / 16621609190

30 Jul 2025 11:43AM UTC coverage: 84.491% (+0.008%) from 84.483%
16621609190

push

github

rerowep
chore: update dependencies

Co-Authored-by: Peter Weber <peter.weber@rero.ch>

4560 of 5397 relevant lines covered (84.49%)

0.84 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.64
/rero_mef/marctojson/do_idref_agent.py
1
# RERO MEF
2
# Copyright (C) 2020 RERO
3
#
4
# This program is free software: you can redistribute it and/or modify
5
# it under the terms of the GNU Affero General Public License as published by
6
# the Free Software Foundation, version 3 of the License.
7
#
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU Affero General Public License for more details.
12
#
13
# You should have received a copy of the GNU Affero General Public License
14
# along with this program. If not, see <http://www.gnu.org/licenses/>.
15

16
"""Marctojsons transformer for IDREF records."""
17

18
from datetime import datetime, timezone
1✔
19

20
from rero_mef.marctojson.helper import (
1✔
21
    COUNTRIES,
22
    COUNTRY_UNIMARC_MARC21,
23
    LANGUAGES,
24
    build_string_list_from_fields,
25
    remove_trailing_punctuation,
26
)
27

28
LANGUAGE_SCRIPT_CODES = {
1✔
29
    "ba": "latn",
30
    "ca": "cyrl",
31
    "da": "jpan",
32
    "db": "jpan",
33
    "dc": "jpan",
34
    "ea": "hani",
35
    "fa": "arab",
36
    "ga": "grek",
37
    "ha": "hebr",
38
    "ia": "thai",
39
    "ja": "deva",
40
    "ka": "kore",
41
    "la": "taml",
42
    "ma": "geor",
43
    "mb": "armn",
44
}
45

46

47
def get_script_code(field):
1✔
48
    """Get script_code from $7."""
49
    try:
1✔
50
        subfield_7 = field["7"]
1✔
51
        code = subfield_7[4:6]
1✔
52
        script_code = LANGUAGE_SCRIPT_CODES[code]
1✔
53
    except Exception:
1✔
54
        script_code = "latn"
1✔
55
    return script_code
1✔
56

57

58
def build_language_string_list_from_fields(
1✔
59
    record,
60
    tag,
61
    subfields,
62
    punctuation=",",
63
    spaced_punctuation=":;/-",
64
    tag_grouping=None,
65
):
66
    """Build a list of strings (one per field).
67

68
    from the given field tag and given subfields.
69
    the given separator is used as subfields delimiter.
70
    """
71
    if not tag_grouping:
1✔
72
        tag_grouping = []
1✔
73
    fields = record.get_fields(tag)
1✔
74
    field_string_list = []
1✔
75
    for field in fields:
1✔
76
        grouping_data = []
1✔
77
        grouping_code = []
1✔
78
        for code, data in field:
1✔
79
            if code in subfields:
1✔
80
                if isinstance(data, (list, set)):
1✔
81
                    data = subfields[code].join(data)
×
82
                data = data.replace("\x98", "")
1✔
83
                data = data.replace("\x9c", "")
1✔
84
                data = data.replace(",,", ",")
1✔
85
                data = remove_trailing_punctuation(
1✔
86
                    data=data,
87
                    punctuation=punctuation,
88
                    spaced_punctuation=spaced_punctuation,
89
                )
90
                if data := data.strip():
1✔
91
                    for group in tag_grouping:
1✔
92
                        if code in group["subtags"]:
×
93
                            code = group["subtags"]
×
94
                    if grouping_code and code == grouping_code[-1]:
1✔
95
                        grouping_data[-1].append(data)
1✔
96
                    else:
97
                        grouping_code.append(code)
1✔
98
                        grouping_data.append([data])
1✔
99
        subfield_string = ""
1✔
100
        for group in zip(grouping_code, grouping_data):
1✔
101
            grouping_start = ""
1✔
102
            grouping_end = ""
1✔
103
            delimiter = subfields.get(group[0])
1✔
104
            subdelimiter = subfields.get(group[0])
1✔
105
            for grouping in tag_grouping:
1✔
106
                if group[0] == grouping["subtags"]:
×
107
                    grouping_start = grouping.get("start", "")
×
108
                    grouping_end = grouping.get("end", "")
×
109
                    delimiter = grouping.get("delimiter", "")
×
110
                    subdelimiter = grouping.get("subdelimiter", "")
×
111

112
            if subfield_string:
1✔
113
                subfield_string += (
1✔
114
                    delimiter
115
                    + grouping_start
116
                    + subdelimiter.join(group[1])
117
                    + grouping_end
118
                )
119
            else:
120
                subfield_string = (
1✔
121
                    grouping_start + subdelimiter.join(group[1]) + grouping_end
122
                )
123

124
        if subfield_string:
1✔
125
            script_code = get_script_code(field)
1✔
126
            if script_code == "latn":
1✔
127
                field_string_list.insert(0, subfield_string.strip())
1✔
128
            else:
129
                field_string_list.append(subfield_string.strip())
1✔
130
    return field_string_list
1✔
131

132

133
class Transformation:
1✔
134
    """Transformation UNIMARC to JSON for IDREF autority person."""
135

136
    def __init__(self, marc, logger=None, verbose=False, transform=True):
1✔
137
        """Constructor."""
138
        self.marc = marc
1✔
139
        self.logger = logger
1✔
140
        self.verbose = verbose
1✔
141
        self.json_dict = {}
1✔
142
        if transform:
1✔
143
            self._transform()
×
144

145
    def _transform(self):
1✔
146
        """Call the transformation functions."""
147
        if self.marc.get_fields("200") or self.marc.get_fields("210"):
×
148
            for func in dir(self):
×
149
                if func.startswith("trans"):
×
150
                    func = getattr(self, func)
×
151
                    func()
×
152
        else:
153
            msg = "No 200 or 210"
×
154
            if self.logger and self.verbose:
×
155
                self.logger.warning(f"NO TRANSFORMATION: {msg}")
×
156
            self.json_dict = {"NO TRANSFORMATION": msg}
×
157
            self.trans_idref_pid()
×
158

159
    @property
1✔
160
    def json(self):
1✔
161
        """Json data."""
162
        return self.json_dict or None
1✔
163

164
    def trans_idref_deleted(self):
1✔
165
        """Transformation deleted leader 5 == d."""
166
        if self.logger and self.verbose:
1✔
167
            self.logger.info("Call Function", "trans_idref_deleted")
1✔
168
        if self.marc.leader[5] == "d":
1✔
169
            self.json_dict["deleted"] = datetime.now(timezone.utc).isoformat()
1✔
170

171
    def trans_idref_relation_pid(self):
1✔
172
        """Transformation old pids 035 $a $9 = sudoc."""
173
        if self.logger and self.verbose:
1✔
174
            self.logger.info("Call Function", "trans_idref_relation_pid")
1✔
175
        for field_035 in self.marc.get_fields("035"):
1✔
176
            subfield_a = field_035.get("a")
1✔
177
            if isinstance(subfield_a, list):
1✔
178
                subfield_a = subfield_a[0]
×
179
            subfield_2 = field_035.get("2")
1✔
180
            if isinstance(subfield_2, list):
1✔
181
                subfield_2 = subfield_2[0]
×
182
            subfield_9 = field_035.get("9")
1✔
183
            if isinstance(subfield_9, list):
1✔
184
                subfield_9 = subfield_9[0]
×
185
            if subfield_a and subfield_9 == "sudoc":
1✔
186
                self.json_dict["relation_pid"] = {
1✔
187
                    "value": field_035["a"],
188
                    "type": "redirect_from",
189
                }
190
            elif subfield_2:
1✔
191
                identified_by = self.json_dict.get("identifiedBy", [])
1✔
192
                identified_by.append(
1✔
193
                    {
194
                        "source": subfield_2.upper(),
195
                        "type": "uri" if subfield_a.startswith("http") else "bf:Nbn",
196
                        "value": subfield_a,
197
                    }
198
                )
199
                self.json_dict["identifiedBy"] = identified_by
1✔
200

201
    def trans_idref_gender(self):
1✔
202
        """Transformation gender 120 $a a:female, b: male, -:not known."""
203
        if self.logger and self.verbose:
1✔
204
            self.logger.info("Call Function", "trans_idref_gender")
1✔
205
        if fields_120 := self.marc.get_fields("120"):
1✔
206
            if fields_120[0].get("a"):
1✔
207
                gender = None
1✔
208
                gender_type = fields_120[0]["a"]
1✔
209
                if gender_type == "a":
1✔
210
                    gender = "female"
1✔
211
                elif gender_type == "b":
1✔
212
                    gender = "male"
1✔
213
                elif gender_type == "-":
1✔
214
                    gender = "not known"
1✔
215
                if gender:
1✔
216
                    self.json_dict["gender"] = gender
1✔
217

218
    def trans_idref_language(self):
1✔
219
        """Transformation language 101 $a."""
220
        if self.logger and self.verbose:
1✔
221
            self.logger.info("Call Function", "trans_idref_language")
1✔
222
        if (fields_101 := self.marc.get_fields("101")) and (
1✔
223
            language_list := [
224
                language
225
                for language in fields_101[0].get_subfields("a")
226
                if language in LANGUAGES
227
            ]
228
        ):
229
            self.json_dict["language"] = language_list
1✔
230

231
    def trans_idref_pid(self):
1✔
232
        """Transformation pid from field 001."""
233
        if self.logger and self.verbose:
1✔
234
            self.logger.info("Call Function", "trans_idref_pid")
1✔
235
        if fields_001 := self.marc.get_fields("001"):
1✔
236
            self.json_dict["pid"] = fields_001[0].data
1✔
237

238
    def trans_idref_identifier(self):
1✔
239
        """Transformation identifier from field 003."""
240
        if self.logger and self.verbose:
1✔
241
            self.logger.info("Call Function", "trans_idref_identifier")
1✔
242
        if fields_003 := self.marc.get_fields("003"):
1✔
243
            identified_by = self.json_dict.get("identifiedBy", [])
1✔
244
            identified_by.append(
1✔
245
                {"source": "IDREF", "type": "uri", "value": fields_003[0].data}
246
            )
247
            self.json_dict["identifiedBy"] = identified_by
1✔
248

249
    def trans_idref_birth_and_death_dates(self):
1✔
250
        """Transformation birth_date and death_date."""
251

252
        def format_103_date(date_str):
1✔
253
            """Format date from 103.."""
254
            date = ""
1✔
255
            if date_str := date_str.strip().replace(" ", ""):
1✔
256
                unknown = False
1✔
257
                if date_str[-1] == "?":
1✔
258
                    unknown = True
1✔
259
                    date_str = date_str[:-1]
1✔
260
                year = date_str[:4]
1✔
261
                month = date_str[4:6]
1✔
262
                day = date_str[6:8]
1✔
263
                if year:
1✔
264
                    date = year
1✔
265
                if month:
1✔
266
                    date += f"-{month}"
1✔
267
                if day:
1✔
268
                    date += f"-{day}"
1✔
269
                if unknown:
1✔
270
                    date += "?"
1✔
271
            return date or None
1✔
272

273
        def format_200_date(date_str):
1✔
274
            """Format date from 200.."""
275
            date_formated = date_str.replace(" ", "")
1✔
276
            if date_formated == "....":
1✔
277
                return None
×
278
            return date_formated
1✔
279

280
        if self.logger and self.verbose:
1✔
281
            self.logger.info("Call Function", "trans_idref_birth_and_death_dates")
1✔
282
        birth_date = ""
1✔
283
        death_date = ""
1✔
284
        if fields_103 := self.marc.get_fields("103"):
1✔
285
            if fields_103[0].get("a"):
1✔
286
                birth_date = format_103_date(fields_103[0]["a"])
1✔
287
            if fields_103[0].get("b"):
1✔
288
                death_date = format_103_date(fields_103[0]["b"])
1✔
289
        elif fields_200 := self.marc.get_fields("200"):
1✔
290
            if fields_200[0].get("f"):
1✔
291
                dates = fields_200[0]["f"].split("-")
1✔
292
                birth_date = format_200_date(dates[0])
1✔
293
                if len(dates) > 1:
1✔
294
                    death_date = format_200_date(dates[1])
1✔
295

296
        start_date_name = "date_of_birth"
1✔
297
        end_date_name = "date_of_death"
1✔
298
        if self.marc.get_fields("210"):
1✔
299
            start_date_name = "date_of_establishment"
1✔
300
            end_date_name = "date_of_termination"
1✔
301
        if birth_date:
1✔
302
            self.json_dict[start_date_name] = birth_date
1✔
303
        if death_date:
1✔
304
            self.json_dict[end_date_name] = death_date
1✔
305

306
    def trans_idref_biographical_information(self):
1✔
307
        """Transformation biographical_information 300 $a 34x $a."""
308
        if self.logger and self.verbose:
1✔
309
            self.logger.info("Call Function", "trans_idref_biographical_information")
1✔
310
        tag_list = [300, *list(range(340, 349 + 1))]  # 300, 340:349
1✔
311
        biographical_information = []
1✔
312
        subfields = {"a": ", "}
1✔
313
        for tag in tag_list:
1✔
314
            biographical_information += build_string_list_from_fields(
1✔
315
                self.marc, str(tag), subfields
316
            )
317
        if biographical_information:
1✔
318
            self.json_dict["biographical_information"] = biographical_information
1✔
319

320
    def trans_idref_numeration(self):
1✔
321
        """Transformation numeration 200 $d."""
322
        if self.logger and self.verbose:
1✔
323
            self.logger.info("Call Function", "trans_idref_numeration")
1✔
324
        subfields = {"d": " "}
1✔
325
        numeration = build_string_list_from_fields(self.marc, "200", subfields)
1✔
326
        if numeration and numeration[0]:
1✔
327
            self.json_dict["numeration"] = numeration[0]
1✔
328

329
    def trans_idref_qualifier(self):
1✔
330
        """Transformation qualifier 200 $c."""
331
        if self.logger and self.verbose:
1✔
332
            self.logger.info("Call Function", "trans_idref_qualifier")
1✔
333
        subfields = {"c": " "}
1✔
334
        qualifier = build_string_list_from_fields(self.marc, "200", subfields)
1✔
335
        if qualifier and qualifier[0]:
1✔
336
            self.json_dict["qualifier"] = qualifier[0]
1✔
337

338
    def trans_idref_preferred_name(self):
1✔
339
        """Transformation preferred_name 200/210."""
340
        if self.logger and self.verbose:
1✔
341
            self.logger.info("Call Function", "trans_idref_preferred_name")
1✔
342
        tag = "200"
1✔
343
        subfields = {"a": ", ", "b": ", ", "c": ", "}
1✔
344
        tag_grouping = []
1✔
345
        if self.marc.get_fields("210"):
1✔
346
            tag = "210"
×
347
            subfields = {"a": ", ", "b": ". ", "c": ", "}
×
348
            tag_grouping = [
×
349
                {
350
                    "subtags": "c",
351
                    "start": " (",
352
                    "end": ")",
353
                    "delimiter": "",
354
                    "subdelimiter": ", ",
355
                }
356
            ]
357
        variant_names = self.json_dict.get("variant_name", [])
1✔
358
        preferred_names = build_language_string_list_from_fields(
1✔
359
            record=self.marc, tag=tag, subfields=subfields, tag_grouping=tag_grouping
360
        )
361
        for preferred_name in preferred_names:
1✔
362
            if self.json_dict.get("preferred_name"):
1✔
363
                variant_names.append(preferred_name)
×
364
            else:
365
                self.json_dict["preferred_name"] = preferred_name
1✔
366

367
        if variant_names:
1✔
368
            self.json_dict["variant_name"] = variant_names
×
369

370
    def trans_idref_authorized_access_point(self):
1✔
371
        """Transformation authorized_access_point. 200/210."""
372
        tag = "200"
1✔
373
        agent = "bf:Person"
1✔
374
        subfields = {
1✔
375
            "a": ", ",
376
            "b": ", ",
377
            "c": ", ",
378
            "d": ", ",
379
            "f": ", ",
380
            "x": " - - ",
381
        }
382
        tag_grouping = []
1✔
383
        if self.marc.get_fields("210"):
1✔
384
            tag = "210"
×
385
            agent = "bf:Organisation"
×
386
            self.json_dict["conference"] = self.marc["210"].indicators[0] == "1"
×
387
            subfields = {
×
388
                "a": ", ",
389
                "b": ". ",
390
                "c": ", ",
391
                "d": " ; ",
392
                "e": " ; ",
393
                "f": " ; ",
394
                "x": " - -",
395
            }
396
            tag_grouping = [
×
397
                {
398
                    "subtags": "c",
399
                    "start": " (",
400
                    "end": ")",
401
                    "delimiter": "",
402
                    "subdelimiter": ", ",
403
                },
404
                {
405
                    "subtags": "def",
406
                    "start": " (",
407
                    "end": ")",
408
                    "delimiter": "",
409
                    "subdelimiter": " ; ",
410
                },
411
            ]
412

413
        if self.logger and self.verbose:
1✔
414
            self.logger.info("Call Function", "trans_authorized_access_point")
1✔
415
        variant_access_points = self.json_dict.get("variant_access_point", [])
1✔
416
        authorized_access_points = build_language_string_list_from_fields(
1✔
417
            record=self.marc, tag=tag, subfields=subfields, tag_grouping=tag_grouping
418
        )
419
        for authorized_access_point in authorized_access_points:
1✔
420
            self.json_dict["type"] = agent
1✔
421
            if self.json_dict.get("authorized_access_point"):
1✔
422
                variant_access_points.append(authorized_access_point)
1✔
423
            else:
424
                self.json_dict["authorized_access_point"] = authorized_access_point
1✔
425
        if variant_access_points:
1✔
426
            self.json_dict["variant_access_point"] = variant_access_points
1✔
427

428
    def trans_idref_variant_name(self):
1✔
429
        """Transformation variant_name 400/410."""
430
        if self.logger and self.verbose:
1✔
431
            self.logger.info("Call Function", "trans_idref_variant_name")
1✔
432
        tag = "400"
1✔
433
        subfields = {"a": ", ", "b": ", ", "c": ", "}
1✔
434
        tag_grouping = []
1✔
435
        if self.marc.get_fields("410"):
1✔
436
            tag = "410"
×
437
            subfields = {"a": ", ", "b": ". ", "c": ", "}
×
438
            tag_grouping = [
×
439
                {
440
                    "subtags": "c",
441
                    "start": " (",
442
                    "end": ")",
443
                    "delimiter": "",
444
                    "subdelimiter": ", ",
445
                }
446
            ]
447
        variant_names = self.json_dict.get("variant_name", [])
1✔
448
        if variant_name := build_string_list_from_fields(
1✔
449
            record=self.marc, tag=tag, subfields=subfields, tag_grouping=tag_grouping
450
        ):
451
            variant_names += variant_name
1✔
452
        if variant_names:
1✔
453
            self.json_dict["variant_name"] = variant_names
1✔
454

455
    def trans_idref_variant_access_point(self):
1✔
456
        """Transformation variant_access_point 400/410."""
457
        if self.logger and self.verbose:
1✔
458
            self.logger.info("Call Function", "trans_idref_variant_access_point")
1✔
459
        tag = "400"
1✔
460
        subfields = {
1✔
461
            "a": ", ",
462
            "b": ", ",
463
            "c": ", ",
464
            "d": ", ",
465
            "f": ", ",
466
            "x": " - - ",
467
        }
468
        tag_grouping = []
1✔
469
        if self.marc.get_fields("410"):
1✔
470
            tag = "410"
×
471
            subfields = {
×
472
                "a": ", ",
473
                "b": ". ",
474
                "c": ", ",
475
                "d": " ; ",
476
                "e": " ; ",
477
                "f": " ; ",
478
                "x": " - - ",
479
            }
480
            tag_grouping = [
×
481
                {
482
                    "subtags": "c",
483
                    "start": " (",
484
                    "end": ")",
485
                    "delimiter": "",
486
                    "subdelimiter": ", ",
487
                },
488
                {
489
                    "subtags": "def",
490
                    "start": " (",
491
                    "end": ")",
492
                    "delimiter": "",
493
                    "subdelimiter": " ; ",
494
                },
495
            ]
496

497
        if variant_access_point := build_string_list_from_fields(
1✔
498
            record=self.marc, tag=tag, subfields=subfields, tag_grouping=tag_grouping
499
        ):
500
            self.json_dict["variant_access_point"] = variant_access_point
1✔
501

502
    def trans_idref_parallel_access_point(self):
1✔
503
        """Transformation parallel_access_point 700/710."""
504
        tag = "700"
1✔
505
        subfields = {
1✔
506
            "a": ", ",
507
            "b": ", ",
508
            "c": ", ",
509
            "d": ", ",
510
            "f": ", ",
511
            "x": " - - ",
512
        }
513
        tag_grouping = []
1✔
514
        if self.marc.get_fields("710"):
1✔
515
            tag = "710"
1✔
516
            subfields = {
1✔
517
                "a": ", ",
518
                "b": ". ",
519
                "c": ", ",
520
                "d": " ; ",
521
                "e": " ; ",
522
                "f": " ; ",
523
                "x": " - - ",
524
            }
525
            tag_grouping = [
1✔
526
                {
527
                    "subtags": "c",
528
                    "start": " (",
529
                    "end": ")",
530
                    "delimiter": "",
531
                    "subdelimiter": ", ",
532
                },
533
                {
534
                    "subtags": "def",
535
                    "start": " (",
536
                    "end": ")",
537
                    "delimiter": "",
538
                    "subdelimiter": " ; ",
539
                },
540
            ]
541

542
        if parallel_access_point := build_string_list_from_fields(
1✔
543
            record=self.marc, tag=tag, subfields=subfields, tag_grouping=tag_grouping
544
        ):
545
            self.json_dict["parallel_access_point"] = parallel_access_point
1✔
546

547
    def trans_idref_country_associated(self):
1✔
548
        """Transformation country_associated 102 $a codes ISO 3166-1."""
549
        if self.logger and self.verbose:
1✔
550
            self.logger.info("Call Function", "trans_idref_country_associated")
1✔
551
        if fields_102 := self.marc.get_fields("102"):
1✔
552
            if fields_102[0].get("a"):
1✔
553
                country = COUNTRY_UNIMARC_MARC21.get(fields_102[0]["a"])
1✔
554
                if COUNTRIES.get(country):
1✔
555
                    self.json_dict["country_associated"] = country
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc