• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

sul-dlss / libsys-airflow / 25943250004

15 May 2026 09:53PM UTC coverage: 87.809%. Remained the same
25943250004

Pull #1783

github

web-flow
Merge 366c542bb into 430c0efa2
Pull Request #1783: Fixes dag_run.dag.dag_id which no longer works in Airflow 3

9 of 9 new or added lines in 3 files covered. (100.0%)

9 existing lines in 2 files now uncovered.

5474 of 6234 relevant lines covered (87.81%)

0.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.94
/libsys_airflow/plugins/shared/utils.py
1
import httpx
1✔
2
import json
1✔
3
import logging
1✔
4
import pymarc
1✔
5
import re
1✔
6
import time
1✔
7
import urllib
1✔
8

9
from typing import Union
1✔
10
from datetime import datetime, timezone
1✔
11

12
from airflow.configuration import conf
1✔
13
from airflow.sdk import Variable
1✔
14
from airflow_client.client.models.dag_run_response import DAGRunResponse
1✔
15
from airflow.utils.email import send_email
1✔
16

17
from libsys_airflow.plugins.shared.folio_client import folio_client
1✔
18

19
logger = logging.getLogger(__name__)
1✔
20

21

22
def execution_date() -> str:
1✔
23
    return datetime.now(timezone.utc).isoformat()
1✔
24

25

26
def dag_run_response_url(**kwargs) -> str:
1✔
27
    dag_run: DAGRunResponse = kwargs["dag_run"]
1✔
28
    airflow_url = kwargs.get("airflow_url")
1✔
29

30
    if not airflow_url:
1✔
31
        airflow_url = conf.get('api', 'base_url')
1✔
32
        if not airflow_url.endswith("/"):
1✔
33
            airflow_url = f"{airflow_url}/"
1✔
34

35
    return f"{airflow_url}dags/{dag_run.dag_id}/runs/{dag_run.dag_run_id}"
1✔
36

37

38
def dag_run_url(**kwargs) -> str:
1✔
39
    dag_run = kwargs["dag_run"]
1✔
40
    airflow_url = kwargs.get("airflow_url")
1✔
41

42
    if not airflow_url:
1✔
43
        airflow_url = conf.get('api', 'base_url')
1✔
44
        if not airflow_url.endswith("/"):
1✔
45
            airflow_url = f"{airflow_url}/"
1✔
46

47
    params = urllib.parse.urlencode({"dag_run_id": dag_run.run_id})
1✔
48
    return f"{airflow_url}dags/{dag_run.dag_id}/grid?{params}"
1✔
49

50

51
def is_production():
1✔
52
    return Variable.get("OKAPI_URL").find("prod") > 0
1✔
53

54

55
class MatchFolioRegex(str):
1✔
56

57
    def __eq__(self, pattern):
1✔
58
        return re.match(pattern, self)
1✔
59

60

61
def folio_name() -> Union[str, None]:
1✔
62
    okapi_url = Variable.get("OKAPI_URL")
1✔
63
    match MatchFolioRegex(okapi_url):
1✔
64
        case r'.*stage.*':
1✔
65
            name = "Stage"
1✔
66

67
        case r'.*dev.*':
1✔
68
            name = "Dev"
1✔
69

70
        case r'.*test.*':
1✔
71
            name = "Test"
1✔
72

73
        case _:
1✔
74
            name = None  # type: ignore
1✔
75
    return name
1✔
76

77

78
def send_email_with_server_name(**kwargs):
1✔
79
    """
80
    send_email wrapper to include subject with server name
81
        when not run in production
82
    """
83
    devs_to_email_addr = Variable.get("EMAIL_DEVS")
1✔
84
    to_addresses = kwargs.get("to", devs_to_email_addr)
1✔
85
    subject = kwargs.get("subject")
1✔
86
    html_content = kwargs.get("html_content")
1✔
87
    send_email(
1✔
88
        to=to_addresses,
89
        subject=_subject_with_server_name(subject=subject),
90
        html_content=html_content,
91
    )
92

93

94
def _subject_with_server_name(**kwargs):
1✔
95
    subject = kwargs.get("subject")
1✔
96
    folio_url = Variable.get("FOLIO_URL", "folio-test/stage")
1✔
97
    if is_production():
1✔
98
        return subject
1✔
99
    else:
100
        folio_url = re.sub('https?://', '', folio_url)
1✔
101
        return f"{folio_url} - {subject}"
1✔
102

103

104
class FolioAddMarcTags(object):
1✔
105
    SLEEP = 30
1✔
106

107
    def __init__(self, **kwargs):
1✔
108
        self.httpx_client = httpx.Client()
1✔
109
        self.folio_client = folio_client()
1✔
110

111
    def put_folio_records(self, marc_instance_tags: dict, instance_id: str) -> bool:
1✔
112
        try:
1✔
113
            srs_record = self.__get_srs_record__(instance_id)
1✔
114
            srs_uuid = srs_record["recordId"]  # type: ignore
1✔
115
            marc_json = srs_record["parsedRecord"]["content"]  # type: ignore
1✔
116
            version, instance_hrid = self.__instance_info__(instance_id)
1✔
UNCOV
117
        except TypeError:
×
UNCOV
118
            logger.error(
×
119
                f"Failed to retrieve Active SRS uuid for Instance {instance_id}"
120
            )
UNCOV
121
            return False
×
122

123
        put_result = self.httpx_client.put(
1✔
124
            f"{self.folio_client.okapi_url}/change-manager/parsedRecords/{srs_uuid}",
125
            headers=self.folio_client.okapi_headers,
126
            json={
127
                "id": srs_uuid,
128
                "recordType": "MARC_BIB",
129
                "relatedRecordVersion": version,
130
                "parsedRecord": {
131
                    "content": self.__marc_json_with_new_tags__(
132
                        marc_json, marc_instance_tags
133
                    )
134
                },
135
                "externalIdsHolder": {
136
                    "instanceId": instance_id,
137
                    "instanceHrid": instance_hrid,
138
                },
139
            },
140
        )
141
        if put_result.status_code != 202:
1✔
142
            logger.error(
1✔
143
                f"Failed to update FOLIO for Instance {instance_id} with SRS {srs_uuid}"
144
            )
145
            return False
1✔
146

147
        logger.info(
1✔
148
            f"Request acknowledged to update FOLIO Instance {instance_id} with SRS {srs_uuid}"
149
        )
150
        logger.info("Verifying new tags in SRS record...")
1✔
151
        time.sleep(self.SLEEP)
1✔
152

153
        srs_update = self.__get_srs_record__(instance_id)
1✔
154
        srs_fields = srs_update["parsedRecord"]["content"]["fields"]  # type: ignore
1✔
155

156
        srs_updated = self.__srs_record_updated__(srs_fields, marc_instance_tags)
1✔
157
        logger.info(f"SRS record updated: {srs_updated}")
1✔
158
        return srs_updated
1✔
159

160
    def __srs_record_updated__(self, srs_fields, marc_instance_tags) -> bool:
1✔
161
        record_updated = True
1✔
162
        tag_key = list(marc_instance_tags.keys())[0]
1✔
163
        for tag_values in marc_instance_tags.values():
1✔
164
            for tag_val in tag_values:
1✔
165
                temp_tag_val = {tag_key: tag_val}
1✔
166
                for key, value in temp_tag_val.items():  # noqa
1✔
167
                    for srs_dict in srs_fields:
1✔
168
                        if key not in srs_dict:
1✔
169
                            record_updated = False
1✔
170
                        else:
171
                            record_updated = True
1✔
172
                            if srs_dict[key] != temp_tag_val[key]:
1✔
173
                                record_updated = False
1✔
174
                            break
1✔
175

176
        return record_updated
1✔
177

178
    def __get_srs_record__(self, instance_uuid: str) -> Union[dict, None]:
1✔
179
        source_storage_result = self.folio_client.folio_get(
1✔
180
            f"/source-storage/source-records?instanceId={instance_uuid}"
181
        )
182

183
        try:
1✔
184
            source_records = source_storage_result['sourceRecords']
1✔
185
            if len(source_records) < 1:
1✔
UNCOV
186
                logger.error(f"No Active SRS record found for {instance_uuid}")
×
UNCOV
187
                return None
×
188
            return source_records[0]
1✔
189

UNCOV
190
        except Exception as e:
×
UNCOV
191
            logger.error(
×
192
                f"Failed to retrieve Active SRS record for Instance {instance_uuid} error: {e}"
193
            )
UNCOV
194
            return None
×
195

196
    def __instance_info__(self, instance_uuid: str) -> tuple:
1✔
197
        instance = self.folio_client.folio_get(f"/inventory/instances/{instance_uuid}")
1✔
198
        version = instance["_version"]
1✔
199
        hrid = instance["hrid"]
1✔
200
        return version, hrid
1✔
201

202
    def __marc_json_with_new_tags__(self, marc_json: dict, marc_instances_tags: dict):
1✔
203
        reader = pymarc.reader.JSONReader(json.dumps(marc_json))
1✔
204
        record = [record for record in reader][0]  # always one record in this context
1✔
205

206
        for tag_name, indicator_subfields in marc_instances_tags.items():
1✔
207
            logger.info(f"Constructing MARC tag {tag_name}")
1✔
208
            existing_tags = [
1✔
209
                str(field) for field in record.get_fields(tag_name)
210
            ]  # returns list of strings or empty if record doesn't have any
211
            if existing_tags:
1✔
212
                logger.info(
1✔
213
                    f"Record has existing {tag_name}'s. New fields will be evaluated for uniqueness."
214
                )
215
            else:
216
                logger.info(
1✔
217
                    f"Record does not have existing {tag_name}'s. New fields will be added."
218
                )
219
            # indicator_subfields:
220
            # [{'ind1': ' ', 'ind2': ' ', 'subfields': [{'f': 'STEINMETZ'}, ...]},
221
            # {'ind1': ' ', 'ind2': ' ', 'subfields': [{'f': 'WHITEHEAD'}, ...]}]
222
            new_tags = []
1✔
223
            for row in indicator_subfields:
1✔
224
                new_field = self.__construct_new_field__(row, tag_name)
1✔
225
                if self.__tag_is_unique__(existing_tags, new_field):
1✔
226
                    logger.info(f"New field {new_field.tag} is unique tag.")
1✔
227
                    new_tags.append(new_field)
1✔
228
                else:
229
                    logger.info(f"New field {new_field.tag} is not unique")
1✔
230

231
            for x in new_tags:
1✔
232
                record.add_ordered_field(x)
1✔
233

234
        record_json = record.as_json()
1✔
235
        logger.info(f"Constructing MARC record: {record_json}")
1✔
236
        return record_json
1✔
237

238
    def __construct_new_field__(
1✔
239
        self, indicator_subfields: dict, tag_name: str
240
    ) -> pymarc.Field:
241
        new_field = pymarc.Field(
1✔
242
            tag=tag_name, indicators=[indicator_subfields['ind1'], indicator_subfields['ind2']]  # type: ignore
243
        )
244
        for subfields in indicator_subfields['subfields']:
1✔
245
            self.__construct_new_subfields__(new_field, subfields)
1✔
246

247
        return new_field
1✔
248

249
    def __construct_new_subfields__(self, field: pymarc.Field, subfields: dict):
1✔
250
        for sf_code, sf_val in subfields.items():
1✔
251
            field.add_subfield(sf_code, sf_val)
1✔
252

253
        return field
1✔
254

255
    def __tag_is_unique__(self, fields: list, new_field: pymarc.Field) -> bool:
1✔
256
        new_field_string = str(new_field)
1✔
257
        if new_field_string in fields:
1✔
258
            logger.info(f"Skip adding duplicated {new_field_string} field")
1✔
259
            return False
1✔
260

261
        logger.info(f"{new_field_string} tag is unique")
1✔
262
        return True
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc