• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

sul-dlss / libsys-airflow / 16037048862

02 Jul 2025 10:24PM UTC coverage: 88.695% (+0.1%) from 88.55%
16037048862

Pull #1576

github

web-flow
Merge f3f9e497c into 2aaed4fbf
Pull Request #1576: Adds courses data extract for Canvas.

99 of 106 new or added lines in 1 file covered. (93.4%)

7 existing lines in 1 file now uncovered.

4739 of 5343 relevant lines covered (88.7%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.52
/libsys_airflow/plugins/shared/utils.py
1
import httpx
1✔
2
import json
1✔
3
import logging
1✔
4
import pymarc
1✔
5
import re
1✔
6
import time
1✔
7
import urllib
1✔
8

9
from typing import Union
1✔
10

11
from airflow.configuration import conf
1✔
12
from airflow.models import Variable
1✔
13
from airflow.utils.email import send_email
1✔
14

15
from libsys_airflow.plugins.shared.folio_client import folio_client
1✔
16

17
logger = logging.getLogger(__name__)
1✔
18

19

20
def dag_run_url(**kwargs) -> str:
1✔
21
    dag_run = kwargs["dag_run"]
1✔
22
    airflow_url = kwargs.get("airflow_url")
1✔
23

24
    if not airflow_url:
1✔
25
        airflow_url = conf.get('webserver', 'base_url')
1✔
26
        if not airflow_url.endswith("/"):
1✔
27
            airflow_url = f"{airflow_url}/"
1✔
28

29
    params = urllib.parse.urlencode({"dag_run_id": dag_run.run_id})
1✔
30
    return f"{airflow_url}dags/{dag_run.dag.dag_id}/grid?{params}"
1✔
31

32

33
def is_production():
1✔
34
    return Variable.get("OKAPI_URL").find("prod") > 0
1✔
35

36

37
class MatchFolioRegex(str):
1✔
38

39
    def __eq__(self, pattern):
1✔
40
        return re.match(pattern, self)
1✔
41

42

43
def folio_name() -> Union[str, None]:
1✔
44
    okapi_url = Variable.get("OKAPI_URL")
1✔
45
    match MatchFolioRegex(okapi_url):
1✔
46
        case r'.*stage.*':
1✔
47
            name = "Stage"
1✔
48

49
        case r'.*dev.*':
1✔
50
            name = "Dev"
1✔
51

52
        case r'.*test.*':
1✔
53
            name = "Test"
1✔
54

55
        case _:
1✔
56
            name = None  # type: ignore
1✔
57
    return name
1✔
58

59

60
def send_email_with_server_name(**kwargs):
1✔
61
    """
62
    send_email wrapper to include subject with server name
63
        when not run in production
64
    """
65
    devs_to_email_addr = Variable.get("EMAIL_DEVS")
1✔
66
    to_addresses = kwargs.get("to", devs_to_email_addr)
1✔
67
    subject = kwargs.get("subject")
1✔
68
    html_content = kwargs.get("html_content")
1✔
69
    send_email(
1✔
70
        to=to_addresses,
71
        subject=_subject_with_server_name(subject=subject),
72
        html_content=html_content,
73
    )
74

75

76
def _subject_with_server_name(**kwargs):
1✔
77
    subject = kwargs.get("subject")
1✔
78
    folio_url = Variable.get("FOLIO_URL", "folio-test/stage")
1✔
79
    if is_production():
1✔
80
        return subject
1✔
81
    else:
82
        folio_url = re.sub('https?://', '', folio_url)
1✔
83
        return f"{folio_url} - {subject}"
1✔
84

85

86
class FolioAddMarcTags(object):
1✔
87
    SLEEP = 30
1✔
88

89
    def __init__(self, **kwargs):
1✔
90
        self.httpx_client = httpx.Client()
1✔
91
        self.folio_client = folio_client()
1✔
92

93
    def put_folio_records(self, marc_instance_tags: dict, instance_id: str) -> bool:
1✔
94
        try:
1✔
95
            srs_record = self.__get_srs_record__(instance_id)
1✔
96
            srs_uuid = srs_record["recordId"]  # type: ignore
1✔
97
            marc_json = srs_record["parsedRecord"]["content"]  # type: ignore
1✔
98
            version, instance_hrid = self.__instance_info__(instance_id)
1✔
UNCOV
99
        except TypeError:
×
100
            logger.error(
×
101
                f"Failed to retrieve Active SRS uuid for Instance {instance_id}"
102
            )
UNCOV
103
            return False
×
104

105
        put_result = self.httpx_client.put(
1✔
106
            f"{self.folio_client.okapi_url}/change-manager/parsedRecords/{srs_uuid}",
107
            headers=self.folio_client.okapi_headers,
108
            json={
109
                "id": srs_uuid,
110
                "recordType": "MARC_BIB",
111
                "relatedRecordVersion": version,
112
                "parsedRecord": {
113
                    "content": self.__marc_json_with_new_tags__(
114
                        marc_json, marc_instance_tags
115
                    )
116
                },
117
                "externalIdsHolder": {
118
                    "instanceId": instance_id,
119
                    "instanceHrid": instance_hrid,
120
                },
121
            },
122
        )
123
        if put_result.status_code != 202:
1✔
124
            logger.error(
1✔
125
                f"Failed to update FOLIO for Instance {instance_id} with SRS {srs_uuid}"
126
            )
127
            return False
1✔
128

129
        logger.info(
1✔
130
            f"Request acknowledged to update FOLIO Instance {instance_id} with SRS {srs_uuid}"
131
        )
132
        logger.info("Verifying new tags in SRS record...")
1✔
133
        time.sleep(self.SLEEP)
1✔
134

135
        srs_update = self.__get_srs_record__(instance_id)
1✔
136
        srs_fields = srs_update["parsedRecord"]["content"]["fields"]  # type: ignore
1✔
137

138
        srs_updated = self.__srs_record_updated__(srs_fields, marc_instance_tags)
1✔
139
        logger.info(f"SRS record updated: {srs_updated}")
1✔
140
        return srs_updated
1✔
141

142
    def __srs_record_updated__(self, srs_fields, marc_instance_tags) -> bool:
1✔
143
        record_updated = True
1✔
144
        tag_key = list(marc_instance_tags.keys())[0]
1✔
145
        for tag_values in marc_instance_tags.values():
1✔
146
            for tag_val in tag_values:
1✔
147
                temp_tag_val = {tag_key: tag_val}
1✔
148
                for key, value in temp_tag_val.items():  # noqa
1✔
149
                    for srs_dict in srs_fields:
1✔
150
                        if key not in srs_dict:
1✔
151
                            record_updated = False
1✔
152
                        else:
153
                            record_updated = True
1✔
154
                            if srs_dict[key] != temp_tag_val[key]:
1✔
155
                                record_updated = False
1✔
156
                            break
1✔
157

158
        return record_updated
1✔
159

160
    def __get_srs_record__(self, instance_uuid: str) -> Union[dict, None]:
1✔
161
        source_storage_result = self.folio_client.folio_get(
1✔
162
            f"/source-storage/source-records?instanceId={instance_uuid}"
163
        )
164

165
        try:
1✔
166
            source_records = source_storage_result['sourceRecords']
1✔
167
            if len(source_records) < 1:
1✔
UNCOV
168
                logger.error(f"No Active SRS record found for {instance_uuid}")
×
UNCOV
169
                return None
×
170
            return source_records[0]
1✔
171

UNCOV
172
        except Exception as e:
×
UNCOV
173
            logger.error(
×
174
                f"Failed to retrieve Active SRS record for Instance {instance_uuid} error: {e}"
175
            )
UNCOV
176
            return None
×
177

178
    def __instance_info__(self, instance_uuid: str) -> tuple:
1✔
179
        instance = self.folio_client.folio_get(f"/inventory/instances/{instance_uuid}")
1✔
180
        version = instance["_version"]
1✔
181
        hrid = instance["hrid"]
1✔
182
        return version, hrid
1✔
183

184
    def __marc_json_with_new_tags__(self, marc_json: dict, marc_instances_tags: dict):
1✔
185
        reader = pymarc.reader.JSONReader(json.dumps(marc_json))
1✔
186
        record = [record for record in reader][0]  # always one record in this context
1✔
187

188
        for tag_name, indicator_subfields in marc_instances_tags.items():
1✔
189
            logger.info(f"Constructing MARC tag {tag_name}")
1✔
190
            existing_tags = [
1✔
191
                str(field) for field in record.get_fields(tag_name)
192
            ]  # returns list of strings or empty if record doesn't have any
193
            if existing_tags:
1✔
194
                logger.info(
1✔
195
                    f"Record has existing {tag_name}'s. New fields will be evaluated for uniqueness."
196
                )
197
            else:
198
                logger.info(
1✔
199
                    f"Record does not have existing {tag_name}'s. New fields will be added."
200
                )
201
            # indicator_subfields:
202
            # [{'ind1': ' ', 'ind2': ' ', 'subfields': [{'f': 'STEINMETZ'}, ...]},
203
            # {'ind1': ' ', 'ind2': ' ', 'subfields': [{'f': 'WHITEHEAD'}, ...]}]
204
            new_tags = []
1✔
205
            for row in indicator_subfields:
1✔
206
                new_field = self.__construct_new_field__(row, tag_name)
1✔
207
                if self.__tag_is_unique__(existing_tags, new_field):
1✔
208
                    logger.info(f"New field {new_field.tag} is unique tag.")
1✔
209
                    new_tags.append(new_field)
1✔
210
                else:
211
                    logger.info(f"New field {new_field.tag} is not unique")
1✔
212

213
            for x in new_tags:
1✔
214
                record.add_ordered_field(x)
1✔
215

216
        record_json = record.as_json()
1✔
217
        logger.info(f"Constructing MARC record: {record_json}")
1✔
218
        return record_json
1✔
219

220
    def __construct_new_field__(
1✔
221
        self, indicator_subfields: dict, tag_name: str
222
    ) -> pymarc.Field:
223
        new_field = pymarc.Field(
1✔
224
            tag=tag_name, indicators=[indicator_subfields['ind1'], indicator_subfields['ind2']]  # type: ignore
225
        )
226
        for subfields in indicator_subfields['subfields']:
1✔
227
            self.__construct_new_subfields__(new_field, subfields)
1✔
228

229
        return new_field
1✔
230

231
    def __construct_new_subfields__(self, field: pymarc.Field, subfields: dict):
1✔
232
        for sf_code, sf_val in subfields.items():
1✔
233
            field.add_subfield(sf_code, sf_val)
1✔
234

235
        return field
1✔
236

237
    def __tag_is_unique__(self, fields: list, new_field: pymarc.Field) -> bool:
1✔
238
        new_field_string = str(new_field)
1✔
239
        if new_field_string in fields:
1✔
240
            logger.info(f"Skip adding duplicated {new_field_string} field")
1✔
241
            return False
1✔
242

243
        logger.info(f"{new_field_string} tag is unique")
1✔
244
        return True
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc