• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

liqd / roots / 22072536647

16 Feb 2026 05:41PM UTC coverage: 42.093%. First build
22072536647

Pull #59

github

Pull Request #59: apps/summerization: Integrate Document Summary into Workflow

51 of 314 new or added lines in 7 files covered. (16.24%)

3564 of 8467 relevant lines covered (42.09%)

0.42 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

16.67
/apps/summarization/utils.py
1
"""Utility functions for document processing."""
2

3
import io
1✔
4
import json
1✔
5
from pathlib import Path
1✔
6

7
import fitz  # PyMuPDF
1✔
8
import requests
1✔
9
from docx import Document
1✔
10
from pydantic_ai.messages import BinaryContent
1✔
11
from pydantic_ai.messages import BinaryImage
1✔
12

13

14
# TODO: Deprecated ? We use only URLs now.
15
def read_document(doc_path: Path) -> BinaryImage | BinaryContent:
1✔
16
    """Read document file and return BinaryImage or BinaryContent."""
17
    doc_path = Path(doc_path) if not isinstance(doc_path, Path) else doc_path
×
18

19
    if not doc_path.exists():
×
20
        raise FileNotFoundError(f"File not found: {doc_path}")
×
21

22
    ext = doc_path.suffix.lower()
×
23
    media_type_map = {
×
24
        ".jpg": "image/jpeg",
25
        ".jpeg": "image/jpeg",
26
        ".png": "image/png",
27
        ".pdf": "application/pdf",
28
    }
29
    media_type = media_type_map.get(ext, "image/jpeg")
×
30

31
    with open(doc_path, "rb") as f:
×
32
        file_data = f.read()
×
33

34
    if media_type.startswith("image/"):
×
35
        return BinaryImage(data=file_data, media_type=media_type)
×
36
    else:
37
        return BinaryContent(data=file_data, media_type=media_type)
×
38

39

40
def _is_image_url(url: str, image_extensions: tuple, image_keywords: tuple) -> bool:
1✔
41
    """Check if string is an image URL."""
NEW
42
    if not isinstance(url, str):
×
NEW
43
        return False
×
NEW
44
    url_lower = url.lower()
×
NEW
45
    if not (url_lower.startswith("http://") or url_lower.startswith("https://")):
×
NEW
46
        return False
×
NEW
47
    return url_lower.endswith(image_extensions) or any(
×
48
        keyword in url_lower for keyword in image_keywords
49
    )
50

51

52
# TODO: Deprecated ? Pass image_urls directly to the request ?
53
def _extract_from_value(
1✔
54
    value, urls: list, image_extensions: tuple, image_keywords: tuple
55
) -> None:
56
    """Recursively extract image URLs from JSON value."""
NEW
57
    if isinstance(value, str):
×
NEW
58
        if _is_image_url(value, image_extensions, image_keywords):
×
NEW
59
            urls.append(value)
×
NEW
60
    elif isinstance(value, dict):
×
NEW
61
        for key, val in value.items():
×
NEW
62
            if any(keyword in key.lower() for keyword in image_keywords):
×
NEW
63
                if isinstance(val, str) and _is_image_url(
×
64
                    val, image_extensions, image_keywords
65
                ):
NEW
66
                    urls.append(val)
×
67
            else:
NEW
68
                _extract_from_value(val, urls, image_extensions, image_keywords)
×
NEW
69
    elif isinstance(value, list):
×
NEW
70
        for item in value:
×
NEW
71
            _extract_from_value(item, urls, image_extensions, image_keywords)
×
72

73

74
# TODO: Deprecated ? Pass image_urls directly to the request ?
75
def extract_image_urls_from_json(json_data: str | dict | list) -> list[str]:
1✔
76
    """Extract image URLs from JSON structure."""
NEW
77
    urls = []
×
NEW
78
    image_extensions = (
×
79
        ".jpg",
80
        ".jpeg",
81
        ".png",
82
        ".gif",
83
        ".webp",
84
        ".bmp",
85
        ".tiff",
86
        ".tif",
87
    )
NEW
88
    image_keywords = ("image", "img", "photo", "picture", "url", "src", "href")
×
89

NEW
90
    if isinstance(json_data, str):
×
NEW
91
        try:
×
NEW
92
            json_data = json.loads(json_data)
×
NEW
93
        except json.JSONDecodeError:
×
NEW
94
            return []
×
95

NEW
96
    _extract_from_value(json_data, urls, image_extensions, image_keywords)
×
NEW
97
    return list(dict.fromkeys(urls))
×
98

99

100
def download_document(url: str, timeout: int = 30) -> bytes:
1✔
101
    """Download document from URL and return as bytes."""
NEW
102
    try:
×
NEW
103
        response = requests.get(url, timeout=timeout, stream=True)
×
NEW
104
        response.raise_for_status()
×
NEW
105
        return response.content
×
NEW
106
    except requests.Timeout:
×
NEW
107
        raise requests.RequestException(
×
108
            f"Timeout while downloading document from {url}"
109
        )
NEW
110
    except requests.RequestException as e:
×
NEW
111
        raise requests.RequestException(
×
112
            f"Failed to download document from {url}: {str(e)}"
113
        )
114

115

116
def extract_text_from_pdf(pdf_bytes: bytes) -> str:
1✔
117
    """Extract text from PDF document."""
NEW
118
    try:
×
NEW
119
        pdf_document = fitz.open(stream=pdf_bytes, filetype="pdf")
×
NEW
120
        text_parts = []
×
121

NEW
122
        for page_num in range(len(pdf_document)):
×
NEW
123
            page = pdf_document[page_num]
×
NEW
124
            text = page.get_text()
×
NEW
125
            if text.strip():
×
NEW
126
                text_parts.append(text)
×
127

NEW
128
        pdf_document.close()
×
129

NEW
130
        if not text_parts:
×
NEW
131
            raise ValueError(
×
132
                "No text could be extracted from PDF (may be image-only or encrypted)"
133
            )
134

NEW
135
        return "\n\n".join(text_parts)
×
NEW
136
    except Exception as e:
×
NEW
137
        raise ValueError(f"Failed to extract text from PDF: {str(e)}")
×
138

139

140
def extract_text_from_docx(docx_bytes: bytes) -> str:
1✔
141
    """Extract text from DOCX document."""
NEW
142
    try:
×
NEW
143
        docx_file = io.BytesIO(docx_bytes)
×
NEW
144
        doc = Document(docx_file)
×
145

NEW
146
        text_parts = []
×
NEW
147
        for paragraph in doc.paragraphs:
×
NEW
148
            if paragraph.text.strip():
×
NEW
149
                text_parts.append(paragraph.text)
×
150

NEW
151
        if not text_parts:
×
NEW
152
            raise ValueError("No text could be extracted from DOCX document")
×
153

NEW
154
        return "\n\n".join(text_parts)
×
NEW
155
    except Exception as e:
×
NEW
156
        raise ValueError(f"Failed to extract text from DOCX: {str(e)}")
×
157

158

159
def extract_text_from_document(url: str) -> str:
1✔
160
    """Extract text from PDF or DOCX document downloaded from URL."""
NEW
161
    url_lower = url.lower()
×
NEW
162
    document_bytes = download_document(url)
×
163

NEW
164
    if url_lower.endswith(".pdf"):
×
NEW
165
        return extract_text_from_pdf(document_bytes)
×
NEW
166
    elif url_lower.endswith(".docx"):
×
NEW
167
        return extract_text_from_docx(document_bytes)
×
168
    else:
NEW
169
        raise ValueError(
×
170
            "Unsupported document format. Only PDF (.pdf) and DOCX (.docx) are supported."
171
        )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc