• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

sul-dlss / libsys-airflow / 26064565311

18 May 2026 10:38PM UTC coverage: 87.789% (-0.02%) from 87.809%
26064565311

Pull #1783

github

web-flow
Merge 223086ebf into d9e221c99
Pull Request #1783: Fixes dag_run.dag.dag_id which no longer works in Airflow 3

8 of 8 new or added lines in 3 files covered. (100.0%)

8 existing lines in 1 file now uncovered.

5464 of 6224 relevant lines covered (87.79%)

0.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.59
/libsys_airflow/plugins/shared/utils.py
1
import httpx
1✔
2
import json
1✔
3
import logging
1✔
4
import pymarc
1✔
5
import re
1✔
6
import time
1✔
7

8
from typing import Union
1✔
9
from datetime import datetime, timezone
1✔
10
from urllib.parse import quote
1✔
11

12
from airflow.configuration import conf
1✔
13
from airflow.sdk import Variable
1✔
14
from airflow.utils.email import send_email
1✔
15

16
from libsys_airflow.plugins.shared.folio_client import folio_client
1✔
17

18
logger = logging.getLogger(__name__)
1✔
19

20

21
def execution_date() -> str:
1✔
22
    return datetime.now(timezone.utc).isoformat()
1✔
23

24

25
def dag_run_url(**kwargs) -> str:
1✔
26
    dag_run = kwargs["dag_run"]
1✔
27
    airflow_url = kwargs.get("airflow_url")
1✔
28

29
    if not airflow_url:
1✔
30
        airflow_url = conf.get('api', 'base_url')
1✔
31
        if not airflow_url.endswith("/"):
1✔
32
            airflow_url = f"{airflow_url}/"
1✔
33

34
    return f"{airflow_url}dags/{dag_run.dag_id}/runs/{quote(dag_run.run_id)}"
1✔
35

36

37
def is_production():
1✔
38
    return Variable.get("OKAPI_URL").find("prod") > 0
1✔
39

40

41
class MatchFolioRegex(str):
1✔
42

43
    def __eq__(self, pattern):
1✔
44
        return re.match(pattern, self)
1✔
45

46

47
def folio_name() -> Union[str, None]:
1✔
48
    okapi_url = Variable.get("OKAPI_URL")
1✔
49
    match MatchFolioRegex(okapi_url):
1✔
50
        case r'.*stage.*':
1✔
51
            name = "Stage"
1✔
52

53
        case r'.*dev.*':
1✔
54
            name = "Dev"
1✔
55

56
        case r'.*test.*':
1✔
57
            name = "Test"
1✔
58

59
        case _:
1✔
60
            name = None  # type: ignore
1✔
61
    return name
1✔
62

63

64
def send_email_with_server_name(**kwargs):
1✔
65
    """
66
    send_email wrapper to include subject with server name
67
        when not run in production
68
    """
69
    devs_to_email_addr = Variable.get("EMAIL_DEVS")
1✔
70
    to_addresses = kwargs.get("to", devs_to_email_addr)
1✔
71
    subject = kwargs.get("subject")
1✔
72
    html_content = kwargs.get("html_content")
1✔
73
    send_email(
1✔
74
        to=to_addresses,
75
        subject=_subject_with_server_name(subject=subject),
76
        html_content=html_content,
77
    )
78

79

80
def _subject_with_server_name(**kwargs):
1✔
81
    subject = kwargs.get("subject")
1✔
82
    folio_url = Variable.get("FOLIO_URL", "folio-test/stage")
1✔
83
    if is_production():
1✔
84
        return subject
1✔
85
    else:
86
        folio_url = re.sub('https?://', '', folio_url)
1✔
87
        return f"{folio_url} - {subject}"
1✔
88

89

90
class FolioAddMarcTags(object):
1✔
91
    SLEEP = 30
1✔
92

93
    def __init__(self, **kwargs):
1✔
94
        self.httpx_client = httpx.Client()
1✔
95
        self.folio_client = folio_client()
1✔
96

97
    def put_folio_records(self, marc_instance_tags: dict, instance_id: str) -> bool:
1✔
98
        try:
1✔
99
            srs_record = self.__get_srs_record__(instance_id)
1✔
100
            srs_uuid = srs_record["recordId"]  # type: ignore
1✔
101
            marc_json = srs_record["parsedRecord"]["content"]  # type: ignore
1✔
102
            version, instance_hrid = self.__instance_info__(instance_id)
1✔
UNCOV
103
        except TypeError:
×
UNCOV
104
            logger.error(
×
105
                f"Failed to retrieve Active SRS uuid for Instance {instance_id}"
106
            )
UNCOV
107
            return False
×
108

109
        put_result = self.httpx_client.put(
1✔
110
            f"{self.folio_client.okapi_url}/change-manager/parsedRecords/{srs_uuid}",
111
            headers=self.folio_client.okapi_headers,
112
            json={
113
                "id": srs_uuid,
114
                "recordType": "MARC_BIB",
115
                "relatedRecordVersion": version,
116
                "parsedRecord": {
117
                    "content": self.__marc_json_with_new_tags__(
118
                        marc_json, marc_instance_tags
119
                    )
120
                },
121
                "externalIdsHolder": {
122
                    "instanceId": instance_id,
123
                    "instanceHrid": instance_hrid,
124
                },
125
            },
126
        )
127
        if put_result.status_code != 202:
1✔
128
            logger.error(
1✔
129
                f"Failed to update FOLIO for Instance {instance_id} with SRS {srs_uuid}"
130
            )
131
            return False
1✔
132

133
        logger.info(
1✔
134
            f"Request acknowledged to update FOLIO Instance {instance_id} with SRS {srs_uuid}"
135
        )
136
        logger.info("Verifying new tags in SRS record...")
1✔
137
        time.sleep(self.SLEEP)
1✔
138

139
        srs_update = self.__get_srs_record__(instance_id)
1✔
140
        srs_fields = srs_update["parsedRecord"]["content"]["fields"]  # type: ignore
1✔
141

142
        srs_updated = self.__srs_record_updated__(srs_fields, marc_instance_tags)
1✔
143
        logger.info(f"SRS record updated: {srs_updated}")
1✔
144
        return srs_updated
1✔
145

146
    def __srs_record_updated__(self, srs_fields, marc_instance_tags) -> bool:
1✔
147
        record_updated = True
1✔
148
        tag_key = list(marc_instance_tags.keys())[0]
1✔
149
        for tag_values in marc_instance_tags.values():
1✔
150
            for tag_val in tag_values:
1✔
151
                temp_tag_val = {tag_key: tag_val}
1✔
152
                for key, value in temp_tag_val.items():  # noqa
1✔
153
                    for srs_dict in srs_fields:
1✔
154
                        if key not in srs_dict:
1✔
155
                            record_updated = False
1✔
156
                        else:
157
                            record_updated = True
1✔
158
                            if srs_dict[key] != temp_tag_val[key]:
1✔
159
                                record_updated = False
1✔
160
                            break
1✔
161

162
        return record_updated
1✔
163

164
    def __get_srs_record__(self, instance_uuid: str) -> Union[dict, None]:
1✔
165
        source_storage_result = self.folio_client.folio_get(
1✔
166
            f"/source-storage/source-records?instanceId={instance_uuid}"
167
        )
168

169
        try:
1✔
170
            source_records = source_storage_result['sourceRecords']
1✔
171
            if len(source_records) < 1:
1✔
UNCOV
172
                logger.error(f"No Active SRS record found for {instance_uuid}")
×
UNCOV
173
                return None
×
174
            return source_records[0]
1✔
175

UNCOV
176
        except Exception as e:
×
UNCOV
177
            logger.error(
×
178
                f"Failed to retrieve Active SRS record for Instance {instance_uuid} error: {e}"
179
            )
UNCOV
180
            return None
×
181

182
    def __instance_info__(self, instance_uuid: str) -> tuple:
1✔
183
        instance = self.folio_client.folio_get(f"/inventory/instances/{instance_uuid}")
1✔
184
        version = instance["_version"]
1✔
185
        hrid = instance["hrid"]
1✔
186
        return version, hrid
1✔
187

188
    def __marc_json_with_new_tags__(self, marc_json: dict, marc_instances_tags: dict):
1✔
189
        reader = pymarc.reader.JSONReader(json.dumps(marc_json))
1✔
190
        record = [record for record in reader][0]  # always one record in this context
1✔
191

192
        for tag_name, indicator_subfields in marc_instances_tags.items():
1✔
193
            logger.info(f"Constructing MARC tag {tag_name}")
1✔
194
            existing_tags = [
1✔
195
                str(field) for field in record.get_fields(tag_name)
196
            ]  # returns list of strings or empty if record doesn't have any
197
            if existing_tags:
1✔
198
                logger.info(
1✔
199
                    f"Record has existing {tag_name}'s. New fields will be evaluated for uniqueness."
200
                )
201
            else:
202
                logger.info(
1✔
203
                    f"Record does not have existing {tag_name}'s. New fields will be added."
204
                )
205
            # indicator_subfields:
206
            # [{'ind1': ' ', 'ind2': ' ', 'subfields': [{'f': 'STEINMETZ'}, ...]},
207
            # {'ind1': ' ', 'ind2': ' ', 'subfields': [{'f': 'WHITEHEAD'}, ...]}]
208
            new_tags = []
1✔
209
            for row in indicator_subfields:
1✔
210
                new_field = self.__construct_new_field__(row, tag_name)
1✔
211
                if self.__tag_is_unique__(existing_tags, new_field):
1✔
212
                    logger.info(f"New field {new_field.tag} is unique tag.")
1✔
213
                    new_tags.append(new_field)
1✔
214
                else:
215
                    logger.info(f"New field {new_field.tag} is not unique")
1✔
216

217
            for x in new_tags:
1✔
218
                record.add_ordered_field(x)
1✔
219

220
        record_json = record.as_json()
1✔
221
        logger.info(f"Constructing MARC record: {record_json}")
1✔
222
        return record_json
1✔
223

224
    def __construct_new_field__(
1✔
225
        self, indicator_subfields: dict, tag_name: str
226
    ) -> pymarc.Field:
227
        new_field = pymarc.Field(
1✔
228
            tag=tag_name, indicators=[indicator_subfields['ind1'], indicator_subfields['ind2']]  # type: ignore
229
        )
230
        for subfields in indicator_subfields['subfields']:
1✔
231
            self.__construct_new_subfields__(new_field, subfields)
1✔
232

233
        return new_field
1✔
234

235
    def __construct_new_subfields__(self, field: pymarc.Field, subfields: dict):
1✔
236
        for sf_code, sf_val in subfields.items():
1✔
237
            field.add_subfield(sf_code, sf_val)
1✔
238

239
        return field
1✔
240

241
    def __tag_is_unique__(self, fields: list, new_field: pymarc.Field) -> bool:
1✔
242
        new_field_string = str(new_field)
1✔
243
        if new_field_string in fields:
1✔
244
            logger.info(f"Skip adding duplicated {new_field_string} field")
1✔
245
            return False
1✔
246

247
        logger.info(f"{new_field_string} tag is unique")
1✔
248
        return True
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc