• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

sul-dlss / libsys-airflow / 26060872109

18 May 2026 09:14PM UTC coverage: 87.807% (-0.002%) from 87.809%
26060872109

Pull #1783

github

web-flow
Merge 347abbe19 into d9e221c99
Pull Request #1783: Fixes dag_run.dag.dag_id which no longer works in Airflow 3

9 of 9 new or added lines in 3 files covered. (100.0%)

9 existing lines in 2 files now uncovered.

5473 of 6233 relevant lines covered (87.81%)

0.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.9
/libsys_airflow/plugins/shared/utils.py
1
import httpx
1✔
2
import json
1✔
3
import logging
1✔
4
import pymarc
1✔
5
import re
1✔
6
import time
1✔
7

8
from typing import Union
1✔
9
from datetime import datetime, timezone
1✔
10
from urllib.parse import quote
1✔
11

12
from airflow.configuration import conf
1✔
13
from airflow.sdk import Variable
1✔
14
from airflow_client.client.models.dag_run_response import DAGRunResponse
1✔
15
from airflow.utils.email import send_email
1✔
16

17
from libsys_airflow.plugins.shared.folio_client import folio_client
1✔
18

19
logger = logging.getLogger(__name__)
1✔
20

21

22
def execution_date() -> str:
1✔
23
    return datetime.now(timezone.utc).isoformat()
1✔
24

25

26
def dag_run_response_url(**kwargs) -> str:
1✔
27
    dag_run: DAGRunResponse = kwargs["dag_run"]
1✔
28
    airflow_url = kwargs.get("airflow_url")
1✔
29

30
    if not airflow_url:
1✔
31
        airflow_url = conf.get('api', 'base_url')
1✔
32
        if not airflow_url.endswith("/"):
1✔
33
            airflow_url = f"{airflow_url}/"
1✔
34

35
    return f"{airflow_url}dags/{dag_run.dag_id}/runs/{dag_run.dag_run_id}"
1✔
36

37

38
def dag_run_url(**kwargs) -> str:
1✔
39
    dag_run = kwargs["dag_run"]
1✔
40
    airflow_url = kwargs.get("airflow_url")
1✔
41

42
    if not airflow_url:
1✔
43
        airflow_url = conf.get('api', 'base_url')
1✔
44
        if not airflow_url.endswith("/"):
1✔
45
            airflow_url = f"{airflow_url}/"
1✔
46

47
    return f"{airflow_url}dags/{dag_run.dag_id}/runs/{quote(dag_run.run_id)}"
1✔
48

49

50
def is_production():
1✔
51
    return Variable.get("OKAPI_URL").find("prod") > 0
1✔
52

53

54
class MatchFolioRegex(str):
1✔
55

56
    def __eq__(self, pattern):
1✔
57
        return re.match(pattern, self)
1✔
58

59

60
def folio_name() -> Union[str, None]:
1✔
61
    okapi_url = Variable.get("OKAPI_URL")
1✔
62
    match MatchFolioRegex(okapi_url):
1✔
63
        case r'.*stage.*':
1✔
64
            name = "Stage"
1✔
65

66
        case r'.*dev.*':
1✔
67
            name = "Dev"
1✔
68

69
        case r'.*test.*':
1✔
70
            name = "Test"
1✔
71

72
        case _:
1✔
73
            name = None  # type: ignore
1✔
74
    return name
1✔
75

76

77
def send_email_with_server_name(**kwargs):
1✔
78
    """
79
    send_email wrapper to include subject with server name
80
        when not run in production
81
    """
82
    devs_to_email_addr = Variable.get("EMAIL_DEVS")
1✔
83
    to_addresses = kwargs.get("to", devs_to_email_addr)
1✔
84
    subject = kwargs.get("subject")
1✔
85
    html_content = kwargs.get("html_content")
1✔
86
    send_email(
1✔
87
        to=to_addresses,
88
        subject=_subject_with_server_name(subject=subject),
89
        html_content=html_content,
90
    )
91

92

93
def _subject_with_server_name(**kwargs):
1✔
94
    subject = kwargs.get("subject")
1✔
95
    folio_url = Variable.get("FOLIO_URL", "folio-test/stage")
1✔
96
    if is_production():
1✔
97
        return subject
1✔
98
    else:
99
        folio_url = re.sub('https?://', '', folio_url)
1✔
100
        return f"{folio_url} - {subject}"
1✔
101

102

103
class FolioAddMarcTags(object):
1✔
104
    SLEEP = 30
1✔
105

106
    def __init__(self, **kwargs):
1✔
107
        self.httpx_client = httpx.Client()
1✔
108
        self.folio_client = folio_client()
1✔
109

110
    def put_folio_records(self, marc_instance_tags: dict, instance_id: str) -> bool:
1✔
111
        try:
1✔
112
            srs_record = self.__get_srs_record__(instance_id)
1✔
113
            srs_uuid = srs_record["recordId"]  # type: ignore
1✔
114
            marc_json = srs_record["parsedRecord"]["content"]  # type: ignore
1✔
115
            version, instance_hrid = self.__instance_info__(instance_id)
1✔
UNCOV
116
        except TypeError:
×
UNCOV
117
            logger.error(
×
118
                f"Failed to retrieve Active SRS uuid for Instance {instance_id}"
119
            )
UNCOV
120
            return False
×
121

122
        put_result = self.httpx_client.put(
1✔
123
            f"{self.folio_client.okapi_url}/change-manager/parsedRecords/{srs_uuid}",
124
            headers=self.folio_client.okapi_headers,
125
            json={
126
                "id": srs_uuid,
127
                "recordType": "MARC_BIB",
128
                "relatedRecordVersion": version,
129
                "parsedRecord": {
130
                    "content": self.__marc_json_with_new_tags__(
131
                        marc_json, marc_instance_tags
132
                    )
133
                },
134
                "externalIdsHolder": {
135
                    "instanceId": instance_id,
136
                    "instanceHrid": instance_hrid,
137
                },
138
            },
139
        )
140
        if put_result.status_code != 202:
1✔
141
            logger.error(
1✔
142
                f"Failed to update FOLIO for Instance {instance_id} with SRS {srs_uuid}"
143
            )
144
            return False
1✔
145

146
        logger.info(
1✔
147
            f"Request acknowledged to update FOLIO Instance {instance_id} with SRS {srs_uuid}"
148
        )
149
        logger.info("Verifying new tags in SRS record...")
1✔
150
        time.sleep(self.SLEEP)
1✔
151

152
        srs_update = self.__get_srs_record__(instance_id)
1✔
153
        srs_fields = srs_update["parsedRecord"]["content"]["fields"]  # type: ignore
1✔
154

155
        srs_updated = self.__srs_record_updated__(srs_fields, marc_instance_tags)
1✔
156
        logger.info(f"SRS record updated: {srs_updated}")
1✔
157
        return srs_updated
1✔
158

159
    def __srs_record_updated__(self, srs_fields, marc_instance_tags) -> bool:
1✔
160
        record_updated = True
1✔
161
        tag_key = list(marc_instance_tags.keys())[0]
1✔
162
        for tag_values in marc_instance_tags.values():
1✔
163
            for tag_val in tag_values:
1✔
164
                temp_tag_val = {tag_key: tag_val}
1✔
165
                for key, value in temp_tag_val.items():  # noqa
1✔
166
                    for srs_dict in srs_fields:
1✔
167
                        if key not in srs_dict:
1✔
168
                            record_updated = False
1✔
169
                        else:
170
                            record_updated = True
1✔
171
                            if srs_dict[key] != temp_tag_val[key]:
1✔
172
                                record_updated = False
1✔
173
                            break
1✔
174

175
        return record_updated
1✔
176

177
    def __get_srs_record__(self, instance_uuid: str) -> Union[dict, None]:
1✔
178
        source_storage_result = self.folio_client.folio_get(
1✔
179
            f"/source-storage/source-records?instanceId={instance_uuid}"
180
        )
181

182
        try:
1✔
183
            source_records = source_storage_result['sourceRecords']
1✔
184
            if len(source_records) < 1:
1✔
UNCOV
185
                logger.error(f"No Active SRS record found for {instance_uuid}")
×
UNCOV
186
                return None
×
187
            return source_records[0]
1✔
188

UNCOV
189
        except Exception as e:
×
UNCOV
190
            logger.error(
×
191
                f"Failed to retrieve Active SRS record for Instance {instance_uuid} error: {e}"
192
            )
UNCOV
193
            return None
×
194

195
    def __instance_info__(self, instance_uuid: str) -> tuple:
1✔
196
        instance = self.folio_client.folio_get(f"/inventory/instances/{instance_uuid}")
1✔
197
        version = instance["_version"]
1✔
198
        hrid = instance["hrid"]
1✔
199
        return version, hrid
1✔
200

201
    def __marc_json_with_new_tags__(self, marc_json: dict, marc_instances_tags: dict):
1✔
202
        reader = pymarc.reader.JSONReader(json.dumps(marc_json))
1✔
203
        record = [record for record in reader][0]  # always one record in this context
1✔
204

205
        for tag_name, indicator_subfields in marc_instances_tags.items():
1✔
206
            logger.info(f"Constructing MARC tag {tag_name}")
1✔
207
            existing_tags = [
1✔
208
                str(field) for field in record.get_fields(tag_name)
209
            ]  # returns list of strings or empty if record doesn't have any
210
            if existing_tags:
1✔
211
                logger.info(
1✔
212
                    f"Record has existing {tag_name}'s. New fields will be evaluated for uniqueness."
213
                )
214
            else:
215
                logger.info(
1✔
216
                    f"Record does not have existing {tag_name}'s. New fields will be added."
217
                )
218
            # indicator_subfields:
219
            # [{'ind1': ' ', 'ind2': ' ', 'subfields': [{'f': 'STEINMETZ'}, ...]},
220
            # {'ind1': ' ', 'ind2': ' ', 'subfields': [{'f': 'WHITEHEAD'}, ...]}]
221
            new_tags = []
1✔
222
            for row in indicator_subfields:
1✔
223
                new_field = self.__construct_new_field__(row, tag_name)
1✔
224
                if self.__tag_is_unique__(existing_tags, new_field):
1✔
225
                    logger.info(f"New field {new_field.tag} is unique tag.")
1✔
226
                    new_tags.append(new_field)
1✔
227
                else:
228
                    logger.info(f"New field {new_field.tag} is not unique")
1✔
229

230
            for x in new_tags:
1✔
231
                record.add_ordered_field(x)
1✔
232

233
        record_json = record.as_json()
1✔
234
        logger.info(f"Constructing MARC record: {record_json}")
1✔
235
        return record_json
1✔
236

237
    def __construct_new_field__(
1✔
238
        self, indicator_subfields: dict, tag_name: str
239
    ) -> pymarc.Field:
240
        new_field = pymarc.Field(
1✔
241
            tag=tag_name, indicators=[indicator_subfields['ind1'], indicator_subfields['ind2']]  # type: ignore
242
        )
243
        for subfields in indicator_subfields['subfields']:
1✔
244
            self.__construct_new_subfields__(new_field, subfields)
1✔
245

246
        return new_field
1✔
247

248
    def __construct_new_subfields__(self, field: pymarc.Field, subfields: dict):
1✔
249
        for sf_code, sf_val in subfields.items():
1✔
250
            field.add_subfield(sf_code, sf_val)
1✔
251

252
        return field
1✔
253

254
    def __tag_is_unique__(self, fields: list, new_field: pymarc.Field) -> bool:
1✔
255
        new_field_string = str(new_field)
1✔
256
        if new_field_string in fields:
1✔
257
            logger.info(f"Skip adding duplicated {new_field_string} field")
1✔
258
            return False
1✔
259

260
        logger.info(f"{new_field_string} tag is unique")
1✔
261
        return True
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc