• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

sul-dlss / libsys-airflow / 10219143935

02 Aug 2024 04:49PM UTC coverage: 85.184% (-0.09%) from 85.275%
10219143935

Pull #1125

github

web-flow
Merge 7c8f772c8 into 2dd074074
Pull Request #1125: Serializes MARC to XML and Compresses File

87 of 101 new or added lines in 7 files covered. (86.14%)

13 existing lines in 2 files now uncovered.

4134 of 4853 relevant lines covered (85.18%)

0.85 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.75
/libsys_airflow/plugins/data_exports/transmission_tasks.py
1
import copy
1✔
2
import logging
1✔
3

4
import httpx
1✔
5
import pymarc
1✔
6

7
from pathlib import Path
1✔
8
from s3path import S3Path
1✔
9
from datetime import datetime
1✔
10

11
from airflow.decorators import task
1✔
12
from airflow.models.connection import Connection
1✔
13
from airflow.providers.ftp.hooks.ftp import FTPHook
1✔
14

15
from libsys_airflow.plugins.data_exports.oclc_api import (
1✔
16
    oclc_records_operation,
17
    get_instance_uuid,
18
)
19

20
from libsys_airflow.plugins.shared.utils import is_production
1✔
21

22
logger = logging.getLogger(__name__)
1✔
23

24

25
@task
1✔
26
def gather_files_task(**kwargs) -> dict:
1✔
27
    """
28
    Gets files to send to vendor:
29
    Looks for all the files in the data-export-files/{vendor}/marc-files folder
30
    File glob patterns include "**/" to get the deletes, new, and updates folders
31
    Regardless of date stamp
32
    """
33
    logger.info("Gathering files to transmit")
1✔
34
    airflow = kwargs.get("airflow", "/opt/airflow")
1✔
35
    vendor = kwargs["vendor"]
1✔
36
    params = kwargs.get("params", {})
1✔
37
    bucket = params.get("bucket", {})
1✔
38
    marc_filepath = Path(airflow) / f"data-export-files/{vendor}/marc-files/"
1✔
39
    file_glob_pattern = vendor_fileformat_spec(vendor)
1✔
40
    if vendor == "full-dump":
1✔
41
        marc_filepath = S3Path(f"/{bucket}/data-export-files/{vendor}/marc-files/")
1✔
42
    marc_filelist = []
1✔
43
    for f in marc_filepath.glob(file_glob_pattern):
1✔
44
        if f.stat().st_size == 0:
1✔
UNCOV
45
            continue
×
46
        marc_filelist.append(str(f))
1✔
47

48
    return {
1✔
49
        "file_list": marc_filelist,
50
        "s3": bool(bucket),
51
    }
52

53

54
@task
1✔
55
def retry_failed_files_task(**kwargs) -> dict:
1✔
56
    """
57
    Returns a list of files and s3 boolean
58
    Uses the list of failed files from xcom
59
    """
60
    marc_filelist = []
1✔
61
    params = kwargs.get("params", {})
1✔
62
    bucket = params.get("bucket", {})
1✔
63
    if len(kwargs["files"]) == 0:
1✔
64
        logger.info("No failures to retry")
1✔
65
    else:
66
        logger.info("Retry failed files")
1✔
67
        marc_filelist = kwargs["files"]
1✔
68

69
    return {
1✔
70
        "file_list": marc_filelist,
71
        "s3": bool(bucket),
72
    }
73

74

75
@task(multiple_outputs=True)
1✔
76
def gather_oclc_files_task(**kwargs) -> dict:
1✔
77
    """
78
    Gets deleted, new, and updated MARC files by library (SUL, Business, Hoover, and Law)
79
    to send to OCLC
80
    """
81
    airflow = kwargs.get("airflow", "/opt/airflow")
1✔
82
    libraries: dict = {
1✔
83
        "S7Z": [],  # Business
84
        "HIN": [],  # Hoover
85
        "CASUM": [],  # Lane
86
        "RCJ": [],  # Law
87
        "STF": [],  # SUL
88
    }
89
    output = {
1✔
90
        "deletes": copy.deepcopy(libraries),
91
        "new": copy.deepcopy(libraries),
92
        "updates": copy.deepcopy(libraries),
93
    }
94
    oclc_directory = Path(airflow) / "data-export-files/oclc/marc-files/"
1✔
95
    for marc_file_path in oclc_directory.glob("**/*.mrc"):
1✔
96
        type_of = marc_file_path.parent.name
1✔
97
        library = marc_file_path.stem.split("-")[1]
1✔
98
        output[type_of][library].append(str(marc_file_path))
1✔
99
    return output
1✔
100

101

102
@task
1✔
103
def transmit_data_http_task(gather_files, **kwargs) -> dict:
1✔
104
    if not is_production():
1✔
UNCOV
105
        return return_success_test_instance(gather_files)
×
106
    """
1✔
107
    Transmit the data via http
108
    Returns lists of files successfully transmitted and failures
109
    """
110
    success = []
1✔
111
    failures = []
1✔
112
    files_params = kwargs.get("files_params", "upload")
1✔
113
    params = kwargs.get("params", {})
1✔
114
    conn_id = params["vendor"]
1✔
115
    logger.info(f"Transmit data to {conn_id}")
1✔
116
    connection = Connection.get_connection_from_secrets(conn_id)
1✔
117
    if gather_files["s3"]:
1✔
118
        path_module = S3Path
1✔
119
    else:
120
        path_module = Path
1✔
121
    with httpx.Client(
1✔
122
        headers=connection.extra_dejson,
123
        params=vendor_url_params(conn_id, gather_files["s3"]),
124
        follow_redirects=True,
125
    ) as client:
126
        for f in gather_files["file_list"]:
1✔
127
            files = {files_params: path_module(f).open("rb")}
1✔
128
            request = client.build_request("POST", connection.host, files=files)
1✔
129
            try:
1✔
130
                logger.info(f"Start transmission of data from file {f}")
1✔
131
                response = client.send(request)
1✔
132
                response.raise_for_status()
1✔
133
                success.append(f)
1✔
134
                logger.info(f"End transmission of data from file {f}")
1✔
135
            except httpx.HTTPError as e:
1✔
136
                logger.error(f"Error for {e.request.url} - {e}")
1✔
137
                failures.append(f)
1✔
138

139
    return {"success": success, "failures": failures}
1✔
140

141

142
@task
1✔
143
def transmit_data_ftp_task(conn_id, gather_files) -> dict:
1✔
144
    if not is_production():
1✔
UNCOV
145
        return return_success_test_instance(gather_files)
×
146
    """
1✔
147
    Transmit the data via ftp
148
    Returns lists of files successfully transmitted and failures
149
    """
150
    hook = FTPHook(ftp_conn_id=conn_id)
1✔
151
    connection = Connection.get_connection_from_secrets(conn_id)
1✔
152
    remote_path = connection.extra_dejson["remote_path"]
1✔
153
    success = []
1✔
154
    failures = []
1✔
155
    for f in gather_files["file_list"]:
1✔
156
        remote_file_name = vendor_filename_spec(conn_id, f)
1✔
157
        remote_file_path = f"{remote_path}/{remote_file_name}"
1✔
158
        try:
1✔
159
            logger.info(f"Start transmission of file {f}")
1✔
160
            hook.store_file(remote_file_path, f)
1✔
161
            success.append(f)
1✔
162
            logger.info(f"End transmission of file {f}")
1✔
UNCOV
163
        except Exception as e:
×
UNCOV
164
            logger.error(e)
×
165
            logger.error(f"Exception for transmission of file {f}")
×
166
            failures.append(f)
×
167

168
    return {"success": success, "failures": failures}
1✔
169

170

171
@task(multiple_outputs=True)
1✔
172
def delete_from_oclc_task(connection_details: list, delete_records: dict) -> dict:
1✔
173

174
    connection_lookup = oclc_connections(connection_details)
1✔
175

176
    return oclc_records_operation(
1✔
177
        connections=connection_lookup,
178
        oclc_function="delete",
179
        records=delete_records,
180
    )
181

182

183
def __filter_save_marc__(file_str: str, instance_uuids: list):
1✔
184
    new_records = []
1✔
185
    file_path = Path(file_str)
1✔
186
    with file_path.open('rb') as fo:
1✔
187
        marc_reader = pymarc.MARCReader(fo)
1✔
188
        for record in marc_reader:
1✔
189
            instance_uuid = get_instance_uuid(record)
1✔
190
            if instance_uuid in instance_uuids:
1✔
191
                new_records.append(record)
1✔
192
    if len(new_records) > 0:
1✔
193
        logger.info(f"Replacing {len(new_records)} in {file_path}")
1✔
194
        with file_path.open('wb+') as fo:
1✔
195
            marc_writer = pymarc.MARCWriter(fo)
1✔
196
            for record in new_records:
1✔
197
                marc_writer.write(record)
1✔
198

199

200
@task(multiple_outputs=True)
1✔
201
def filter_new_marc_records_task(new_records: dict, new_instance_uuids: dict) -> dict:
1✔
202
    filtered_new_records: dict = {}
1✔
203
    for library, uuids in new_instance_uuids.items():
1✔
204
        filtered_new_records[library] = []
1✔
205
        if len(uuids) > 0:
1✔
206
            new_files = new_records[library]
1✔
207
            for row in new_files:
1✔
208
                __filter_save_marc__(row, uuids)
1✔
209
            filtered_new_records[library] = new_files
1✔
210

211
    return filtered_new_records
1✔
212

213

214
@task(multiple_outputs=True)
1✔
215
def match_oclc_task(connection_details: list, new_records: dict) -> dict:
1✔
216

217
    connection_lookup = oclc_connections(connection_details)
1✔
218

219
    return oclc_records_operation(
1✔
220
        connections=connection_lookup,
221
        oclc_function="match",
222
        records=new_records,
223
    )
224

225

226
@task(multiple_outputs=True)
1✔
227
def new_to_oclc_task(connection_details: list, new_records: dict) -> dict:
1✔
228

229
    connection_lookup = oclc_connections(connection_details)
1✔
230

231
    return oclc_records_operation(
1✔
232
        connections=connection_lookup,
233
        oclc_function="new",
234
        records=new_records,
235
    )
236

237

238
@task(multiple_outputs=True)
1✔
239
def set_holdings_oclc_task(connection_details: list, update_records: dict) -> dict:
1✔
240

241
    connection_lookup = oclc_connections(connection_details)
1✔
242

243
    return oclc_records_operation(
1✔
244
        connections=connection_lookup,
245
        oclc_function="update",
246
        records=update_records,
247
    )
248

249

250
@task
1✔
251
def consolidate_oclc_archive_files(
1✔
252
    deleted, matched: list, new_files: list, updated: list
253
) -> list:
UNCOV
254
    unique_files = set(deleted + matched + new_files + updated)
×
UNCOV
255
    return list(unique_files)
×
256

257

258
@task
1✔
259
def archive_transmitted_data_task(files):
1✔
260
    """
261
    Given a list of successfully transmitted files, move files to
262
    'transmitted' folder under each data-export-files/{vendor}.
263
    Also moves the instanceid file with the same vendor and filename
264
    Also moves the xml or gz files with the same filename as what was transmitted (i.e. GOBI txt files)
265
    """
266
    logger.info("Moving transmitted files to archive directory")
1✔
267

268
    if len(files) < 1:
1✔
269
        logger.warning("No files to archive")
1✔
270
        return
1✔
271

272
    archive_dir = Path(files[0]).parent.parent.parent / "transmitted"
1✔
273
    archive_dir.mkdir(exist_ok=True)
1✔
274
    for x in files:
1✔
275
        kind = Path(x).parent.name
1✔
276
        # original_transmitted_file_path = data-export-files/{vendor}/marc-files/new|updates|deletes/*.xml|*.gz|*.txt
277
        original_transmitted_file_path = Path(x)
1✔
278

279
        # archive_path = data-export-files/{vendor}/transmitted/new|updates|deletes
280
        archive_path = archive_dir / kind
1✔
281
        archive_path.mkdir(exist_ok=True)
1✔
282
        archive_path = archive_path / original_transmitted_file_path.name
1✔
283

284
        # instance_path = data-export-files/{vendor}/instanceids/new|updates|deletes/*.csv
285
        instance_path = (
1✔
286
            original_transmitted_file_path.parent.parent.parent
287
            / f"instanceids/{kind}/{original_transmitted_file_path.stem}.csv"
288
        )
289
        instance_archive_path = archive_dir / kind / instance_path.name
1✔
290

291
        marc_path = (
1✔
292
            original_transmitted_file_path.parent
293
            / f"{original_transmitted_file_path.stem}.xml"
294
        )
295
        marc_archive_path = archive_dir / kind / marc_path.name
1✔
296

297
        # move transmitted files (for GOBI this will be *.txt files; for POD this will be *.gz files)
298
        logger.info(
1✔
299
            f"Moving transmitted file {original_transmitted_file_path} to {archive_path}"
300
        )
301
        original_transmitted_file_path.replace(archive_path)
1✔
302

303
        # move instance id files with same stem as transmitted filename
304
        if instance_path.exists():
1✔
305
            logger.info(
1✔
306
                f"Moving related instanceid file {instance_path} to {instance_archive_path}"
307
            )
308
            instance_path.replace(instance_archive_path)
1✔
309

310
        # move marc files with same stem as transmitted filename (when transmitted file is not *.xml)
311
        if marc_path.exists():
1✔
312
            logger.info(f"Moving related marc file {marc_path} to {marc_archive_path}")
1✔
313
            marc_path.replace(marc_archive_path)
1✔
314

315

316
def vendor_fileformat_spec(vendor):
1✔
317
    """
318
    Returns file glob pattern depending on vendor's requirement for uncompressed or compressed MARCXML or text files
319
    """
320
    match vendor:
1✔
321
        case "pod":
1✔
322
            return "**/*.gz"
1✔
323
        case "full-dump":
1✔
324
            return "**/*.gz"
1✔
325
        case "gobi":
1✔
326
            return "**/*.txt"
1✔
NEW
327
        case _:
×
NEW
328
            return "**/*.xml"
×
329

330

331
def vendor_filename_spec(conn_id, filename):
1✔
332
    """
333
    Returns a filename per the vendor's filenaming convention
334
    """
335
    if conn_id == "gobi":
1✔
336
        # gobi should have "stf" prepended
UNCOV
337
        return "stf" + Path(filename).name
×
338
    elif conn_id == "sharevde":
1✔
UNCOV
339
        return "tbd"
×
340
    else:
341
        Path(filename).name
1✔
342

343

344
def vendor_url_params(conn_id, is_s3_path) -> dict:
1✔
345
    """
346
    Sets vendor specific URL paramters for transmitting data with httpx client
347
    """
348
    if conn_id == "pod" and is_s3_path:
1✔
349
        url_params = {"stream": datetime.now().strftime('%Y-%m-%d')}
1✔
350
        logger.info(f"Setting URL params to {url_params}")
1✔
351
    else:
352
        url_params = {}
1✔
353

354
    return url_params
1✔
355

356

357
def return_success_test_instance(files) -> dict:
1✔
UNCOV
358
    logger.info("SKIPPING TRANSMISSION")
×
UNCOV
359
    return {"success": files["file_list"], "failures": []}
×
360

361

362
def oclc_connections(connection_details: list) -> dict:
1✔
363
    connection_lookup = {}
1✔
364
    for conn_id in connection_details:
1✔
365
        connection = Connection.get_connection_from_secrets(conn_id)
1✔
366
        oclc_code = connection.extra_dejson["oclc_code"]
1✔
367
        connection_lookup[oclc_code] = {
1✔
368
            "username": connection.login,
369
            "password": connection.password,
370
        }
371
    return connection_lookup
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc