• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

rero / rero-mef / 16621609190

30 Jul 2025 11:43AM UTC coverage: 84.491% (+0.008%) from 84.483%
16621609190

push

github

rerowep
chore: update dependencies

Co-Authored-by: Peter Weber <peter.weber@rero.ch>

4560 of 5397 relevant lines covered (84.49%)

0.84 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.39
/rero_mef/marctojson/do_gnd_agent.py
1
# RERO MEF
2
# Copyright (C) 2020 RERO
3
#
4
# This program is free software: you can redistribute it and/or modify
5
# it under the terms of the GNU Affero General Public License as published by
6
# the Free Software Foundation, version 3 of the License.
7
#
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU Affero General Public License for more details.
12
#
13
# You should have received a copy of the GNU Affero General Public License
14
# along with this program. If not, see <http://www.gnu.org/licenses/>.
15

16
"""Marctojsons transformer for GND records."""
17
# https://www.dnb.de/EN/Professionell/Metadatendienste/Datenbezug/GND_Aenderungsdienst/gndAenderungsdienst_node.html
18

19
import re
1✔
20
from datetime import datetime, timezone
1✔
21

22
from rero_mef.marctojson.helper import (
1✔
23
    COUNTRIES,
24
    COUNTRY_UNIMARC_MARC21,
25
    LANGUAGES,
26
    build_string_list_from_fields,
27
)
28

29
PUNCTUATION_POLICY = {
1✔
30
    "#": "No information provided",
31
    "c": "Punctuation omitted",
32
    "i": "Punctuation included",
33
    "u": "Unknown",
34
}
35

36
RECORD_TYPES = {
1✔
37
    "p": "bf:Person",
38
    "b": "bf:Organisation",
39
    "f": "bf:Organisation",
40
    "g": "bf:Place",
41
    "s": "bf:Topic",
42
    "u": "bf:Title",
43
}
44

45

46
class Transformation:
1✔
47
    """Transformation MARC21 to JSON for GND autority person."""
48

49
    def __init__(self, marc, logger=None, verbose=False, transform=True):
1✔
50
        """Constructor."""
51
        self.marc = marc
1✔
52
        self.logger = logger
1✔
53
        self.verbose = verbose
1✔
54
        self.json_dict = {}
1✔
55
        self.punctuation = marc.leader[18]
1✔
56
        if transform:
1✔
57
            self._transform()
1✔
58

59
    def get_type(self):
1✔
60
        """Get type of record.
61

62
        Entitäten der GND (Satztypen) 075 $b TYPE $2 gndgen
63
        - b Körperschaft
64
        - f Konferenz
65
        - g Geografikum
66
        - n Person (nicht individualisiert)
67
        - p Person (individualisiert)
68
        - s Sachbegriff
69
        - u Werk
70
        """
71
        for field_075 in self.marc.get_fields("075") or []:
1✔
72
            if field_075.get("2") and field_075["2"] == "gndgen":
1✔
73
                return RECORD_TYPES.get(field_075["b"])
1✔
74
        return None
×
75

76
    def _transform(self):
1✔
77
        """Call the transformation functions."""
78
        record_type = self.get_type()
1✔
79
        if record_type in {"bf:Person", "bf:Organisation"}:
1✔
80
            if self.marc.get_fields("100", "110", "111"):
1✔
81
                for func in dir(self):
1✔
82
                    if func.startswith("trans"):
1✔
83
                        func = getattr(self, func)
1✔
84
                        func()
1✔
85
            else:
86
                msg = "No 100 or 110 or 111"
1✔
87
                if self.logger and self.verbose:
1✔
88
                    self.logger.warning(f"NO TRANSFORMATION: {msg}")
×
89
                self.json_dict = {"NO TRANSFORMATION": msg}
1✔
90
                self.trans_gnd_pid()
1✔
91
        else:
92
            msg = f"Not a person or organisation: {record_type}"
1✔
93
            if self.logger and self.verbose:
1✔
94
                self.logger.warning(f"NO TRANSFORMATION: {msg}")
×
95
            self.json_dict = {"NO TRANSFORMATION": msg}
1✔
96
            self.trans_gnd_pid()
1✔
97

98
    @property
1✔
99
    def json(self):
1✔
100
        """Json data."""
101
        return self.json_dict or None
1✔
102

103
    def trans_gnd_deleted(self):
1✔
104
        """Transformation deleted leader 5.
105

106
        $c: Redirect notification
107
        $x: Redirect
108
        $c: Deletion notification
109
        $d: Deletion
110

111
        https://www.dnb.de/EN/Professionell/Metadatendienste/Datenbezug/
112
        GND_Aenderungsdienst/gndAenderungsdienst_node.html
113
        """
114
        if self.logger and self.verbose:
1✔
115
            self.logger.info("Call Function", "trans_gnd_deleted")
1✔
116
        if self.marc.leader[5] in ["c", "d", "x"]:
1✔
117
            self.json_dict["deleted"] = datetime.now(timezone.utc).isoformat()
1✔
118

119
    def trans_gnd_relation_pid(self):
1✔
120
        """Transformation relation pids 682 $0.
121

122
        https://www.dnb.de/EN/Professionell/Metadatendienste/Datenbezug/
123
        GND_Aenderungsdienst/gndAenderungsdienst_node.html
124
        """
125
        if self.logger and self.verbose:
1✔
126
            self.logger.info("Call Function", "trans_gnd_relation_pid")
1✔
127
        fields_035 = self.marc.get_fields("682")
1✔
128
        for field_035 in fields_035:
1✔
129
            if field_035.get("i") and field_035["i"] == "Umlenkung":
1✔
130
                subfields_0 = field_035.get_subfields("0")
1✔
131
                for subfield_0 in subfields_0:
1✔
132
                    if subfield_0.startswith("(DE-101)"):
1✔
133
                        self.json_dict["relation_pid"] = {
1✔
134
                            "value": subfield_0.replace("(DE-101)", ""),
135
                            "type": "redirect_to",
136
                        }
137

138
    def trans_gnd_gender(self):
1✔
139
        """Transform gender 375 $a 1 = male, 2 = female, " " = not known."""
140
        if self.logger and self.verbose:
1✔
141
            self.logger.info("Call Function", "trans_gnd_gender")
1✔
142
        gender = ""
1✔
143
        if fields_375 := self.marc.get_fields("375"):
1✔
144
            if fields_375[0].get("a"):
1✔
145
                gender_type = fields_375[0]["a"]
1✔
146
                if gender_type == "2":
1✔
147
                    gender = "female"
1✔
148
                elif gender_type == "1":
1✔
149
                    gender = "male"
1✔
150
                elif gender_type == " ":
1✔
151
                    gender = "not known"
1✔
152
            if gender:
1✔
153
                self.json_dict["gender"] = gender
1✔
154

155
    def trans_gnd_language(self):
1✔
156
        """Transformation language 377 $a."""
157
        if self.logger and self.verbose:
1✔
158
            self.logger.info("Call Function", "trans_language")
1✔
159
        if (field_377 := self.marc.get_fields("377")) and (
1✔
160
            language_list := [
161
                language
162
                for language in field_377[0].get_subfields("a")
163
                if language in LANGUAGES
164
            ]
165
        ):
166
            self.json_dict["language"] = language_list
1✔
167

168
    def trans_gnd_pid(self):
1✔
169
        """Transformation pid from field 001."""
170
        if self.logger and self.verbose:
1✔
171
            self.logger.info("Call Function", "trans_gnd_pid")
1✔
172
        if field_001 := self.marc.get_fields("001"):
1✔
173
            self.json_dict["pid"] = field_001[0].data
1✔
174

175
    def trans_gnd_identifier(self):
1✔
176
        """Transformation identifier from field 001."""
177
        if self.logger and self.verbose:
1✔
178
            self.logger.info("Call Function", "trans_gnd_identifier")
1✔
179
        fields_024 = self.marc.get_fields("024")
1✔
180
        for field_024 in fields_024:
1✔
181
            subfields_0 = field_024.get("0")
1✔
182
            subfields_2 = field_024.get("2")
1✔
183
            if subfields_0 and subfields_2 == "gnd":
1✔
184
                self.json_dict.setdefault("identifiedBy", []).append(
1✔
185
                    {"type": "uri", "value": subfields_0, "source": "GND"}
186
                )
187

188
    def trans_gnd_birth_and_death_dates(self):
1✔
189
        """Transformation birth_date and death_date."""
190

191
        def format_100_date(date_str):
1✔
192
            """Format date from field 100."""
193
            date_formated = date_str
1✔
194
            if len(date_str) == 8:
1✔
195
                date_formated = f"{date_str[:4]}-{date_str[4:6]}-{date_str[6:8]}"
1✔
196
            elif len(date_str) == 4:
1✔
197
                date_formated = date_str[:4]
1✔
198
            return date_formated
1✔
199

200
        def format_548_date(date_str):
1✔
201
            """Format date from field 548."""
202
            date_formated = date_str
1✔
203
            if len(date_str) == 4:
1✔
204
                date_formated = date_str[:4]
1✔
205
            return date_formated
1✔
206

207
        def get_date(dates_per_tag, selector):
1✔
208
            """Get date base on selector ('birth_date' or 'death_date')."""
209
            if "datx" in dates_per_tag and selector in dates_per_tag["datx"]:
1✔
210
                return dates_per_tag["datx"][selector]
1✔
211
            if "datl" in dates_per_tag and selector in dates_per_tag["datl"]:
1✔
212
                return dates_per_tag["datl"][selector]
1✔
213
            if "datb" in dates_per_tag and selector in dates_per_tag["datb"]:
1✔
214
                return dates_per_tag["datb"][selector]
×
215
            if "100" in dates_per_tag and selector in dates_per_tag["100"]:
1✔
216
                return dates_per_tag["100"][selector]
1✔
217
            return None
1✔
218

219
        if self.logger and self.verbose:
1✔
220
            self.logger.info("Call Function", "trans_gnd_birth_and_death_dates")
1✔
221
        dates_per_tag = {}
1✔
222
        if fields_100 := self.marc.get_fields("100"):
1✔
223
            if fields_100[0].get("d"):
1✔
224
                field_100_d = fields_100[0]["d"]
1✔
225
                dates_string = re.sub(r"\s+", " ", field_100_d).strip()
1✔
226
                dates = dates_string.split("-")
1✔
227
                dates_per_tag["100"] = {}
1✔
228
                dates_per_tag["100"]["birth_date"] = format_100_date(dates[0])
1✔
229
                if len(dates) > 1:
1✔
230
                    death_date = format_100_date(dates[1])
1✔
231
                    dates_per_tag["100"]["death_date"] = format_100_date(dates[1])
1✔
232

233
        for field_548 in self.marc.get_fields("548"):
1✔
234
            if (
1✔
235
                field_548.get("a")
236
                and field_548.get("4")
237
                and field_548["4"] in ("datl", "datx", "datb")
238
            ):
239
                dates = field_548["a"].split("-")
1✔
240
                if birth_date := format_548_date(dates[0]):
1✔
241
                    dates_per_tag.setdefault(field_548["4"], {})
1✔
242
                    dates_per_tag[field_548["4"]]["birth_date"] = birth_date
1✔
243
                if len(dates) > 1:
1✔
244
                    if death_date := format_548_date(dates[1]):
1✔
245
                        dates_per_tag.setdefault(field_548["4"], {})
1✔
246
                        dates_per_tag[field_548["4"]]["death_date"] = death_date
1✔
247

248
        if self.marc.get_fields("110") or self.marc.get_fields("111"):
1✔
249
            date_of_establishment = get_date(dates_per_tag, "birth_date")
1✔
250
            date_of_termination = get_date(dates_per_tag, "death_date")
1✔
251
            dates_per_tag.pop("100", None)
1✔
252
            dates_per_tag.pop("datl", None)
1✔
253
            dates_per_tag.pop("datx", None)
1✔
254
            if date_of_establishment:
1✔
255
                self.json_dict["date_of_establishment"] = date_of_establishment
1✔
256
            if date_of_termination:
1✔
257
                self.json_dict["date_of_termination"] = date_of_termination
1✔
258
        else:
259
            dates_per_tag.pop("datb", None)
1✔
260
            birth_date = get_date(dates_per_tag, "birth_date")
1✔
261
            if birth_date:
1✔
262
                self.json_dict["date_of_birth"] = birth_date
1✔
263
            death_date = get_date(dates_per_tag, "death_date")
1✔
264
            if death_date:
1✔
265
                self.json_dict["date_of_death"] = death_date
1✔
266

267
    def trans_gnd_biographical_information(self):
1✔
268
        """Transformation biographical_information 678 $abu."""
269
        if self.logger and self.verbose:
1✔
270
            self.logger.info("Call Function", "trans_gnd_biographical_information")
1✔
271
        biographical_information = []
1✔
272
        for tag in [678]:
1✔
273
            subfields = {"a": ", ", "b": ", ", "u": ", "}
1✔
274
            biographical_information += build_string_list_from_fields(
1✔
275
                self.marc, str(tag), subfields
276
            )
277
        if biographical_information:
1✔
278
            self.json_dict["biographical_information"] = biographical_information
1✔
279

280
    def trans_gnd_numeration(self):
1✔
281
        """Transformation numeration 100 $b."""
282
        if self.logger and self.verbose:
1✔
283
            self.logger.info("Call Function", "trans_gnd_numeration")
1✔
284
        subfields = {"b": " "}
1✔
285
        numeration = build_string_list_from_fields(self.marc, "100", subfields)
1✔
286
        if numeration and numeration[0]:
1✔
287
            self.json_dict["numeration"] = numeration[0]
1✔
288

289
    def trans_gnd_qualifier(self):
1✔
290
        """Transformation qualifier 100 $c."""
291
        if self.logger and self.verbose:
1✔
292
            self.logger.info("Call Function", "trans_gnd_qualifier")
1✔
293
        subfields = {"c": " "}
1✔
294
        qualifier = build_string_list_from_fields(self.marc, "100", subfields)
1✔
295
        if qualifier and qualifier[0]:
1✔
296
            self.json_dict["qualifier"] = qualifier[0]
1✔
297

298
    def trans_gnd_conference(self):
1✔
299
        """Transformation conference. false: 075 $b = b true: 075 $b = f."""
300
        if self.logger and self.verbose:
1✔
301
            self.logger.info("Call Function", "trans_gnd_conference")
1✔
302
        if self.marc.get_fields("110") or self.marc.get_fields("111"):
1✔
303
            if field_075 := self.marc.get("075"):
1✔
304
                if subfields_b := field_075.get("b"):
1✔
305
                    if subfields_b[0] == "f":
1✔
306
                        self.json_dict["conference"] = True
1✔
307
                    elif subfields_b[0] == "b":
1✔
308
                        self.json_dict["conference"] = False
1✔
309

310
    def trans_gnd_preferred_name(self):
1✔
311
        """Transformation preferred_name 100/110/111."""
312
        tags = ["100"]
1✔
313
        subfields = {"a": ", ", "b": ", ", "c": ", "}
1✔
314
        if self.marc.get_fields("110") or self.marc.get_fields("111"):
1✔
315
            tags = []
1✔
316
            subfields = {"a": ", ", "b": ". ", "n": ", "}
1✔
317
            if self.marc.get_fields("110"):
1✔
318
                tags.append("110")
1✔
319
            if self.marc.get_fields("111"):
1✔
320
                tags.append("111")
×
321
        if self.logger and self.verbose:
1✔
322
            self.logger.info("Call Function", "trans_gnd_preferred_name")
1✔
323
        variant_names = self.json_dict.get("variant_name", [])
1✔
324
        for tag in tags:
1✔
325
            preferred_names = build_string_list_from_fields(
1✔
326
                record=self.marc, tag=tag, subfields=subfields
327
            )
328
            for idx, preferred_name in enumerate(preferred_names):
1✔
329
                if idx == 0:
1✔
330
                    self.json_dict["preferred_name"] = preferred_name
1✔
331
                else:
332
                    variant_names.append(preferred_name)
×
333
        if variant_names:
1✔
334
            self.json_dict["variant_name"] = variant_names
×
335

336
    def trans_gnd_authorized_access_point(self):
1✔
337
        """Transformation authorized_access_point 100/110/111."""
338
        tags = ["100"]
1✔
339
        agent = "bf:Person"
1✔
340
        subfields = {"a": ", ", "b": ", ", "c": ", ", "d": ", ", "x": " - - "}
1✔
341
        if self.marc.get_fields("110", "111"):
1✔
342
            tags = []
1✔
343
            subfields = {
1✔
344
                "a": ", ",
345
                "b": ". ",
346
                "n": ", ",
347
                "d": ", ",
348
                "c": ", ",
349
                "e": ". ",
350
                "g": ". ",
351
                "k": ". ",
352
                "t": ". ",
353
                "x": " - - ",
354
            }
355
            if self.marc.get_fields("110"):
1✔
356
                tags.append("110")
1✔
357
            if self.marc.get_fields("111"):
1✔
358
                tags.append("111")
×
359
            agent = "bf:Organisation"
1✔
360

361
        if self.logger and self.verbose:
1✔
362
            self.logger.info("Call Function", "trans_gnd_authorized_access_point")
1✔
363
        variant_access_points = self.json_dict.get("variant_access_point", [])
1✔
364
        for tag in tags:
1✔
365
            authorized_access_points = build_string_list_from_fields(
1✔
366
                record=self.marc, tag=tag, subfields=subfields
367
            )
368
            for authorized_access_point in authorized_access_points:
1✔
369
                self.json_dict["type"] = agent
1✔
370
                if self.json_dict.get("authorized_access_point"):
1✔
371
                    variant_access_points.append(authorized_access_point)
×
372
                else:
373
                    self.json_dict["authorized_access_point"] = authorized_access_point
1✔
374
        if variant_access_points:
1✔
375
            self.json_dict["variant_access_point"] = variant_access_points
×
376

377
    def trans_gnd_variant_name(self):
1✔
378
        """Transformation variant_name 400/410/411."""
379
        if self.logger and self.verbose:
1✔
380
            self.logger.info("Call Function", "trans_gnd_variant_name")
1✔
381
        tag = "400"
1✔
382
        subfields = {"a": ", ", "b": ", ", "c": ", "}
1✔
383
        if self.marc.get_fields("410") or self.marc.get_fields("411"):
1✔
384
            subfields = {"a": ", ", "b": ". ", "n": ", "}
1✔
385
            if self.marc.get_fields("410"):
1✔
386
                tag = "410"
1✔
387
            if self.marc.get_fields("411"):
1✔
388
                tag = "411"
×
389
        variant_names = self.json_dict.get("variant_name", [])
1✔
390
        if variant_name := build_string_list_from_fields(
1✔
391
            record=self.marc, tag=tag, subfields=subfields
392
        ):
393
            variant_names += variant_name
1✔
394
        if variant_names:
1✔
395
            self.json_dict["variant_name"] = variant_names
1✔
396

397
    def trans_gnd_variant_access_point(self):
1✔
398
        """Transformation variant_access_point 400/410/411."""
399
        tag = "400"
1✔
400
        subfields = {"a": ", ", "b": ", ", "c": ", ", "d": ", ", "x": " - - "}
1✔
401
        if self.marc.get_fields("410") or self.marc.get_fields("411"):
1✔
402
            subfields = {
1✔
403
                "a": ", ",
404
                "b": ". ",
405
                "n": ", ",
406
                "d": ", ",
407
                "c": ", ",
408
                "e": ". ",
409
                "g": ". ",
410
                "k": ". ",
411
                "t": ". ",
412
                "x": " - - ",
413
            }
414
            if self.marc.get_fields("410"):
1✔
415
                tag = "410"
1✔
416
            if self.marc.get_fields("411"):
1✔
417
                tag = "411"
×
418
        if self.logger and self.verbose:
1✔
419
            self.logger.info("Call Function", "trans_gnd_variant_access_point")
1✔
420
        if variant_access_point := build_string_list_from_fields(
1✔
421
            record=self.marc, tag=tag, subfields=subfields
422
        ):
423
            self.json_dict["variant_access_point"] = variant_access_point
1✔
424

425
    def trans_gnd_parallel_access_point(self):
1✔
426
        """Transformation parallel_access_point 700/710/711."""
427
        tag = "700"
1✔
428
        subfields = {"a": ", ", "b": ", ", "c": ", ", "d": ", ", "x": " - - "}
1✔
429
        if self.marc.get_fields("710") or self.marc.get_fields("711"):
1✔
430
            subfields = {
1✔
431
                "a": ", ",
432
                "b": ". ",
433
                "n": ", ",
434
                "d": ", ",
435
                "c": ", ",
436
                "e": ". ",
437
                "g": ". ",
438
                "k": ". ",
439
                "t": ". ",
440
                "x": " - - ",
441
            }
442
            if self.marc.get_fields("710"):
1✔
443
                tag = "710"
1✔
444
            if self.marc.get_fields("711"):
1✔
445
                tag = "711"
×
446
        if self.logger and self.verbose:
1✔
447
            self.logger.info("Call Function", "trans_gnd_parallel_access_point")
1✔
448
        if parallel_access_point := build_string_list_from_fields(
1✔
449
            record=self.marc, tag=tag, subfields=subfields
450
        ):
451
            self.json_dict["parallel_access_point"] = parallel_access_point
1✔
452

453
    def trans_gnd_country_associated(self):
1✔
454
        """Transformation country_associated 043 $c codes ISO 3166-1."""
455
        if self.logger and self.verbose:
1✔
456
            self.logger.info("Call Function", "trans_gnd_country_associated")
1✔
457
        if fields_043 := self.marc.get_fields("043"):
1✔
458
            if fields_043[0].get("c"):
1✔
459
                country_split = fields_043[0]["c"].split("-")
1✔
460
                if len(country_split) > 1:
1✔
461
                    country = COUNTRY_UNIMARC_MARC21.get(country_split[1])
1✔
462
                    if COUNTRIES.get(country):
1✔
463
                        self.json_dict["country_associated"] = country
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc