• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

sul-dlss / libsys-airflow / 25748807077

12 May 2026 04:46PM UTC coverage: 87.868% (-0.2%) from 88.052%
25748807077

push

github

web-flow
Merge pull request #1625 from sul-dlss/airflow3-upgrade

Airflow3 upgrade :tada:

247 of 329 new or added lines in 53 files covered. (75.08%)

7 existing lines in 6 files now uncovered.

5439 of 6190 relevant lines covered (87.87%)

0.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

38.46
/libsys_airflow/plugins/shared/airflow_api_client.py
1
import os
1✔
2
import airflow_client.client
1✔
3
from pydantic import BaseModel
1✔
4
import httpx
1✔
5
import logging
1✔
6

7
logger = logging.getLogger(__name__)
1✔
8

9

10
class AirflowAccessToken(BaseModel):
1✔
11
    access_token: str
1✔
12

13

14
def get_access_token(
1✔
15
    host: str,
16
    username: str | None,
17
    password: str | None,
18
) -> str:
NEW
19
    url = f"{host}/auth/token"
×
NEW
20
    logger.info(f"Getting access token from {url}")
×
NEW
21
    payload = {
×
22
        "username": username,
23
        "password": password,
24
    }
NEW
25
    headers = {"Content-Type": "application/json"}
×
NEW
26
    try:
×
NEW
27
        response = httpx.post(url, json=payload, headers=headers)
×
NEW
28
        if response.status_code == 201:
×
NEW
29
            response_success = AirflowAccessToken(**response.json())
×
30
        else:
NEW
31
            raise RuntimeError(
×
32
                f"Failed to get access token: {response.status_code} {response.text}"
33
            )
NEW
34
    except httpx.ConnectError as e:
×
NEW
35
        print(f"Connection error: {e}")
×
NEW
36
        raise
×
37

NEW
38
    return response_success.access_token
×
39

40

41
def api_client() -> airflow_client.client.ApiClient:
1✔
NEW
42
    configuration = airflow_client.client.Configuration(
×
43
        host="http://airflow-apiserver:8080",
44
        username=os.getenv("AIRFLOW_VAR_API_USER", "nausername"),
45
        password=os.getenv("AIRFLOW_VAR_API_PASSWORD", "napassword"),
46
    )
NEW
47
    configuration.access_token = get_access_token(
×
48
        host=configuration.host,
49
        username=configuration.username,
50
        password=configuration.password,
51
    )
NEW
52
    return airflow_client.client.ApiClient(configuration)
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc