• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

SamhammerAG / sag_py_fastapi_health / 4698573451

pending completion
4698573451

push

github

oschulz
KIT-2457 ignor .idea

153 of 157 relevant lines covered (97.45%)

0.97 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.89
/sag_py_fastapi_health/checks/http.py
1
from http import HTTPStatus
1✔
2
from traceback import format_exc
1✔
3
from typing import Optional
1✔
4

5
from sag_py_fastapi_health.models import Check, CheckResult
1✔
6

7
try:
1✔
8
    from aiohttp import BasicAuth, ClientResponseError, ClientSession, ClientTimeout, TCPConnector
1✔
9
except ImportError as exc:
×
10
    raise ImportError("Using this module requires the aiohttp library.") from exc
×
11

12

13
class HttpCheck(Check):
1✔
14

15
    def __init__(
1✔
16
        self,
17
        url: str,
18
        username: Optional[str] = None,
19
        password: Optional[str] = None,
20
        verify_ssl: bool = True,
21
        timeout: float = 60.0,
22
        name: str = "HTTP",
23
    ) -> None:
24
        self._url: str = url
1✔
25
        self._username: Optional[str] = username
1✔
26
        self._password: Optional[str] = password
1✔
27
        self._verify_ssl: bool = verify_ssl
1✔
28
        self._timeout: float = timeout
1✔
29
        self._name: str = name
1✔
30

31
    async def __call__(self) -> CheckResult:
1✔
32
        try:
1✔
33
            async with ClientSession(
1✔
34
                connector=TCPConnector(verify_ssl=self._verify_ssl, enable_cleanup_closed=True),
35
                auth=BasicAuth(self._username, self._password or "") if self._username else None,
36
                timeout=ClientTimeout(self._timeout),
37
            ) as session:
38
                async with session.get(self._url) as response:
1✔
39
                    if response.status >= HTTPStatus.INTERNAL_SERVER_ERROR or (
1✔
40
                        self._username and response.status in (HTTPStatus.UNAUTHORIZED, HTTPStatus.FORBIDDEN)
41
                    ):
42
                        response.raise_for_status()
1✔
43
                    return CheckResult(
1✔
44
                        name=self._name,
45
                        status="Healthy" if response.ok else "Unhealthy",
46
                        description=f"Got status {response.status}",
47
                    )
48
        except ClientResponseError as response_error:
1✔
49
            return CheckResult(name=self._name, status="Unhealthy", description=str(response_error))
×
50
        except Exception:
1✔
51
            return CheckResult(name=self._name, status="Unhealthy", description=format_exc())
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc