• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

deepset-ai / haystack / 19546642913

20 Nov 2025 06:02PM UTC coverage: 91.444% (+0.004%) from 91.44%
19546642913

Pull #10111

github

web-flow
Merge d206eae54 into 9c6df792d
Pull Request #10111: fix: ensure that headers keys are unique in link_content

13723 of 15007 relevant lines covered (91.44%)

0.91 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.1
haystack/components/fetchers/link_content.py
1
# SPDX-FileCopyrightText: 2022-present deepset GmbH <info@deepset.ai>
2
#
3
# SPDX-License-Identifier: Apache-2.0
4

5
import asyncio
1✔
6
from collections import defaultdict
1✔
7
from concurrent.futures import ThreadPoolExecutor
1✔
8
from fnmatch import fnmatch
1✔
9
from typing import Callable, Optional, cast
1✔
10

11
import httpx
1✔
12
from tenacity import RetryCallState, retry, retry_if_exception_type, stop_after_attempt, wait_exponential
1✔
13

14
from haystack import component, logging
1✔
15
from haystack.dataclasses import ByteStream
1✔
16
from haystack.lazy_imports import LazyImport
1✔
17
from haystack.version import __version__
1✔
18

19
# HTTP/2 support via lazy import
20
with LazyImport("Run 'pip install httpx[http2]' to use HTTP/2 support") as h2_import:
1✔
21
    pass  # nothing to import as we simply set the http2 attribute, library handles the rest
1✔
22

23
logger = logging.getLogger(__name__)
1✔
24

25
DEFAULT_USER_AGENT = f"haystack/LinkContentFetcher/{__version__}"
1✔
26

27
REQUEST_HEADERS = {
1✔
28
    "accept": "*/*",
29
    "User-Agent": DEFAULT_USER_AGENT,
30
    "Accept-Language": "en-US,en;q=0.9,it;q=0.8,es;q=0.7",
31
    "referer": "https://www.google.com/",
32
}
33

34

35
def _merge_headers(*headers: dict[str, str]) -> dict[str, str]:
1✔
36
    """
37
    Merge a list of dict using case-insensitively
38

39
    :param headers: a list of dict to merge
40
    :returns: The merged dict
41
    """
42
    merged = {}
1✔
43
    keymap = {}
1✔
44

45
    for d in headers:
1✔
46
        for k, v in d.items():
1✔
47
            kl = k.lower()
1✔
48
            keymap[kl] = k
1✔
49
            merged[kl] = v
1✔
50

51
    return {keymap[kl]: v for kl, v in merged.items()}
1✔
52

53

54
def _text_content_handler(response: httpx.Response) -> ByteStream:
1✔
55
    """
56
    Handles text content.
57

58
    :param response: Response object from the request.
59
    :returns: The extracted text.
60
    """
61
    return ByteStream.from_string(response.text)
1✔
62

63

64
def _binary_content_handler(response: httpx.Response) -> ByteStream:
1✔
65
    """
66
    Handles binary content.
67

68
    :param response: Response object from the request.
69
    :returns: The extracted binary file-like object.
70
    """
71
    return ByteStream(data=response.content)
1✔
72

73

74
@component
1✔
75
class LinkContentFetcher:
1✔
76
    """
77
    Fetches and extracts content from URLs.
78

79
    It supports various content types, retries on failures, and automatic user-agent rotation for failed web
80
    requests. Use it as the data-fetching step in your pipelines.
81

82
    You may need to convert LinkContentFetcher's output into a list of documents. Use HTMLToDocument
83
    converter to do this.
84

85
    ### Usage example
86

87
    ```python
88
    from haystack.components.fetchers.link_content import LinkContentFetcher
89

90
    fetcher = LinkContentFetcher()
91
    streams = fetcher.run(urls=["https://www.google.com"])["streams"]
92

93
    assert len(streams) == 1
94
    assert streams[0].meta == {'content_type': 'text/html', 'url': 'https://www.google.com'}
95
    assert streams[0].data
96
    ```
97

98
    For async usage:
99

100
    ```python
101
    import asyncio
102
    from haystack.components.fetchers import LinkContentFetcher
103

104
    async def fetch_async():
105
        fetcher = LinkContentFetcher()
106
        result = await fetcher.run_async(urls=["https://www.google.com"])
107
        return result["streams"]
108

109
    streams = asyncio.run(fetch_async())
110
    ```
111
    """
112

113
    def __init__(  # pylint: disable=too-many-positional-arguments
1✔
114
        self,
115
        raise_on_failure: bool = True,
116
        user_agents: Optional[list[str]] = None,
117
        retry_attempts: int = 2,
118
        timeout: int = 3,
119
        http2: bool = False,
120
        client_kwargs: Optional[dict] = None,
121
        request_headers: Optional[dict[str, str]] = None,
122
    ):
123
        """
124
        Initializes the component.
125

126
        :param raise_on_failure: If `True`, raises an exception if it fails to fetch a single URL.
127
            For multiple URLs, it logs errors and returns the content it successfully fetched.
128
        :param user_agents: [User agents](https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/User-Agent)
129
            for fetching content. If `None`, a default user agent is used.
130
        :param retry_attempts: The number of times to retry to fetch the URL's content.
131
        :param timeout: Timeout in seconds for the request.
132
        :param http2: Whether to enable HTTP/2 support for requests. Defaults to False.
133
                     Requires the 'h2' package to be installed (via `pip install httpx[http2]`).
134
        :param client_kwargs: Additional keyword arguments to pass to the httpx client.
135
                     If `None`, default values are used.
136
        """
137
        self.raise_on_failure = raise_on_failure
1✔
138
        self.user_agents = user_agents or [DEFAULT_USER_AGENT]
1✔
139
        self.current_user_agent_idx: int = 0
1✔
140
        self.retry_attempts = retry_attempts
1✔
141
        self.timeout = timeout
1✔
142
        self.http2 = http2
1✔
143
        self.client_kwargs = client_kwargs or {}
1✔
144
        self.request_headers = request_headers or {}
1✔
145

146
        # Configure default client settings
147
        self.client_kwargs.setdefault("timeout", timeout)
1✔
148
        self.client_kwargs.setdefault("follow_redirects", True)
1✔
149

150
        # Create httpx clients
151
        client_kwargs = {**self.client_kwargs}
1✔
152

153
        # Optional HTTP/2 support
154
        if http2:
1✔
155
            try:
1✔
156
                h2_import.check()
1✔
157
                client_kwargs["http2"] = True
1✔
158
            except ImportError:
×
159
                logger.warning(
×
160
                    "HTTP/2 support requested but 'h2' package is not installed. "
161
                    "Falling back to HTTP/1.1. Install with `pip install httpx[http2]` to enable HTTP/2 support."
162
                )
163
                self.http2 = False  # Update the setting to match actual capability
×
164

165
        # Initialize synchronous client
166
        self._client = httpx.Client(**client_kwargs)
1✔
167

168
        # Initialize asynchronous client
169
        self._async_client = httpx.AsyncClient(**client_kwargs)
1✔
170

171
        # register default content handlers that extract data from the response
172
        self.handlers: dict[str, Callable[[httpx.Response], ByteStream]] = defaultdict(lambda: _text_content_handler)
1✔
173
        self.handlers["text/*"] = _text_content_handler
1✔
174
        self.handlers["text/html"] = _binary_content_handler
1✔
175
        self.handlers["application/json"] = _text_content_handler
1✔
176
        self.handlers["application/*"] = _binary_content_handler
1✔
177
        self.handlers["image/*"] = _binary_content_handler
1✔
178
        self.handlers["audio/*"] = _binary_content_handler
1✔
179
        self.handlers["video/*"] = _binary_content_handler
1✔
180

181
        @retry(
1✔
182
            reraise=True,
183
            stop=stop_after_attempt(self.retry_attempts),
184
            wait=wait_exponential(multiplier=1, min=2, max=10),
185
            retry=(retry_if_exception_type((httpx.HTTPStatusError, httpx.RequestError))),
186
            # This method is invoked only after failed requests (exception raised)
187
            after=self._switch_user_agent,
188
        )
189
        def get_response(url):
1✔
190
            response = self._client.get(url, headers=self._get_headers())
1✔
191
            response.raise_for_status()
1✔
192
            return response
1✔
193

194
        self._get_response: Callable = get_response
1✔
195

196
    def _get_headers(self):
1✔
197
        """
198
        Build headers with precedence
199

200
        client defaults -> component defaults -> user-provided -> rotating UA
201
        """
202
        base = dict(self._client.headers)
1✔
203
        headers = _merge_headers(
1✔
204
            base, REQUEST_HEADERS, self.request_headers, {"User-Agent": self.user_agents[self.current_user_agent_idx]}
205
        )
206
        return headers
1✔
207

208
    def __del__(self):
1✔
209
        """
210
        Clean up resources when the component is deleted.
211

212
        Closes both the synchronous and asynchronous HTTP clients to prevent
213
        resource leaks.
214
        """
215
        try:
1✔
216
            # Close the synchronous client if it exists
217
            if hasattr(self, "_client"):
1✔
218
                self._client.close()
1✔
219

220
            # There is no way to close the async client without await
221
        except Exception:
×
222
            # Suppress any exceptions during cleanup
223
            pass
×
224

225
    @component.output_types(streams=list[ByteStream])
1✔
226
    def run(self, urls: list[str]):
1✔
227
        """
228
        Fetches content from a list of URLs and returns a list of extracted content streams.
229

230
        Each content stream is a `ByteStream` object containing the extracted content as binary data.
231
        Each ByteStream object in the returned list corresponds to the contents of a single URL.
232
        The content type of each stream is stored in the metadata of the ByteStream object under
233
        the key "content_type". The URL of the fetched content is stored under the key "url".
234

235
        :param urls: A list of URLs to fetch content from.
236
        :returns: `ByteStream` objects representing the extracted content.
237

238
        :raises Exception: If the provided list of URLs contains only a single URL, and `raise_on_failure` is set to
239
            `True`, an exception will be raised in case of an error during content retrieval.
240
            In all other scenarios, any retrieval errors are logged, and a list of successfully retrieved `ByteStream`
241
             objects is returned.
242
        """
243
        streams: list[ByteStream] = []
1✔
244
        if not urls:
1✔
245
            return {"streams": streams}
×
246

247
        # don't use multithreading if there's only one URL
248
        if len(urls) == 1:
1✔
249
            stream_metadata, stream = self._fetch(urls[0])
1✔
250
            stream.meta.update(stream_metadata)
1✔
251
            stream.mime_type = stream.meta.get("content_type", None)
1✔
252
            streams.append(stream)
1✔
253
        else:
254
            with ThreadPoolExecutor() as executor:
×
255
                results = executor.map(self._fetch_with_exception_suppression, urls)
×
256

257
            for stream_metadata, stream in results:  # type: ignore
×
258
                if stream_metadata is not None and stream is not None:
×
259
                    stream.meta.update(stream_metadata)
×
260
                    stream.mime_type = stream.meta.get("content_type", None)
×
261
                    streams.append(stream)
×
262

263
        return {"streams": streams}
1✔
264

265
    @component.output_types(streams=list[ByteStream])
1✔
266
    async def run_async(self, urls: list[str]):
1✔
267
        """
268
        Asynchronously fetches content from a list of URLs and returns a list of extracted content streams.
269

270
        This is the asynchronous version of the `run` method with the same parameters and return values.
271

272
        :param urls: A list of URLs to fetch content from.
273
        :returns: `ByteStream` objects representing the extracted content.
274
        """
275
        streams: list[ByteStream] = []
1✔
276
        if not urls:
1✔
277
            return {"streams": streams}
1✔
278

279
        # Create tasks for all URLs using _fetch_async directly
280
        tasks = [self._fetch_async(url, self._async_client) for url in urls]
1✔
281

282
        # Only capture exceptions when we have multiple URLs or raise_on_failure=False
283
        # This ensures errors propagate appropriately for single URLs with raise_on_failure=True
284
        return_exceptions = not (len(urls) == 1 and self.raise_on_failure)
1✔
285
        results = await asyncio.gather(*tasks, return_exceptions=return_exceptions)
1✔
286

287
        # Process results
288
        for i, result in enumerate(results):
1✔
289
            # Handle exception results (only happens when return_exceptions=True)
290
            if isinstance(result, Exception):
1✔
291
                logger.warning("Error fetching {url}: {error}", url=urls[i], error=str(result))
×
292
                # Add an empty result for failed URLs when raise_on_failure=False
293
                if not self.raise_on_failure:
×
294
                    streams.append(ByteStream(data=b"", meta={"content_type": "Unknown", "url": urls[i]}))
×
295
                continue
×
296

297
            # Process successful results
298
            # At this point, result is not an exception, so we need to cast it to the correct type for mypy
299
            if not isinstance(result, Exception):  # Runtime check
1✔
300
                # Use cast to tell mypy that result is the tuple type returned by _fetch_async
301
                result_tuple = cast(tuple[Optional[dict[str, str]], Optional[ByteStream]], result)
1✔
302
                stream_metadata, stream = result_tuple
1✔
303
                if stream_metadata is not None and stream is not None:
1✔
304
                    stream.meta.update(stream_metadata)
1✔
305
                    stream.mime_type = stream.meta.get("content_type", None)
1✔
306
                    streams.append(stream)
1✔
307

308
        return {"streams": streams}
1✔
309

310
    def _fetch(self, url: str) -> tuple[dict[str, str], ByteStream]:
1✔
311
        """
312
        Fetches content from a URL and returns it as a ByteStream.
313

314
        :param url: The URL to fetch content from.
315
        :returns: A tuple containing the ByteStream metadata dict and the corresponding ByteStream.
316
             ByteStream metadata contains the URL and the content type of the fetched content.
317
             The content type is a string indicating the type of content fetched (for example, "text/html",
318
             "application/pdf"). The ByteStream object contains the fetched content as binary data.
319

320
        :raises: If an error occurs during content retrieval and `raise_on_failure` is set to True, this method will
321
        raise an exception. Otherwise, all fetching errors are logged, and an empty ByteStream is returned.
322

323
        """
324
        content_type: str = "text/html"
1✔
325
        stream: ByteStream = ByteStream(data=b"")
1✔
326
        try:
1✔
327
            response = self._get_response(url)
1✔
328
            content_type = self._get_content_type(response)
1✔
329
            handler: Callable = self._resolve_handler(content_type)
1✔
330
            stream = handler(response)
1✔
331
        except Exception as e:
1✔
332
            if self.raise_on_failure:
1✔
333
                raise e
1✔
334
            # less verbose log as this is expected to happen often (requests failing, blocked, etc.)
335
            logger.debug("Couldn't retrieve content from {url} because {error}", url=url, error=str(e))
1✔
336

337
        finally:
338
            self.current_user_agent_idx = 0
1✔
339

340
        return {"content_type": content_type, "url": url}, stream
1✔
341

342
    async def _fetch_async(
1✔
343
        self, url: str, client: httpx.AsyncClient
344
    ) -> tuple[Optional[dict[str, str]], Optional[ByteStream]]:
345
        """
346
        Asynchronously fetches content from a URL and returns it as a ByteStream.
347

348
        :param url: The URL to fetch content from.
349
        :param client: The async httpx client to use for making requests.
350
        :returns: A tuple containing the ByteStream metadata dict and the corresponding ByteStream.
351
        """
352
        content_type: str = "text/html"
1✔
353
        stream: Optional[ByteStream] = None
1✔
354
        metadata: Optional[dict[str, str]] = None
1✔
355

356
        try:
1✔
357
            response = await self._get_response_async(url, client)
1✔
358
            content_type = self._get_content_type(response)
1✔
359
            handler: Callable = self._resolve_handler(content_type)
1✔
360
            stream = handler(response)
1✔
361
            metadata = {"content_type": content_type, "url": url}
1✔
362
        except Exception as e:
1✔
363
            if self.raise_on_failure:
1✔
364
                raise e
1✔
365
            logger.debug("Couldn't retrieve content from {url} because {error}", url=url, error=str(e))
1✔
366
            # Create an empty ByteStream for failed requests when raise_on_failure is False
367
            stream = ByteStream(data=b"")
1✔
368
            metadata = {"content_type": content_type, "url": url}
1✔
369
        finally:
370
            self.current_user_agent_idx = 0
1✔
371

372
        return metadata, stream
1✔
373

374
    def _fetch_with_exception_suppression(self, url: str) -> tuple[Optional[dict[str, str]], Optional[ByteStream]]:
1✔
375
        """
376
        Fetches content from a URL and returns it as a ByteStream.
377

378
        If `raise_on_failure` is set to True, this method will wrap the fetch() method and catch any exceptions.
379
        Otherwise, it will simply call the fetch() method.
380
        :param url: The URL to fetch content from.
381
        :returns: A tuple containing the ByteStream metadata dict and the corresponding ByteStream.
382

383
        """
384
        if self.raise_on_failure:
×
385
            try:
×
386
                return self._fetch(url)
×
387
            except Exception as e:
×
388
                logger.warning("Error fetching {url}: {error}", url=url, error=str(e))
×
389
                return {"content_type": "Unknown", "url": url}, None
×
390
        else:
391
            return self._fetch(url)
×
392

393
    async def _get_response_async(self, url: str, client: httpx.AsyncClient) -> httpx.Response:
1✔
394
        """
395
        Asynchronously gets a response from a URL with retry logic.
396

397
        :param url: The URL to fetch.
398
        :param client: The async httpx client to use for making requests.
399
        :returns: The httpx Response object.
400
        """
401
        attempt = 0
1✔
402
        last_exception = None
1✔
403

404
        while attempt <= self.retry_attempts:
1✔
405
            try:
1✔
406
                response = await client.get(url, headers=self._get_headers())
1✔
407
                response.raise_for_status()
1✔
408
                return response
1✔
409
            except (httpx.HTTPStatusError, httpx.RequestError) as e:
1✔
410
                last_exception = e
1✔
411
                attempt += 1
1✔
412
                if attempt <= self.retry_attempts:
1✔
413
                    self._switch_user_agent(None)  # Switch user agent for next retry
1✔
414
                    # Wait before retry using exponential backoff
415
                    await asyncio.sleep(min(2 * 2 ** (attempt - 1), 10))
1✔
416
                else:
417
                    break
1✔
418

419
        # If we've exhausted all retries, raise the last exception
420
        if last_exception:
1✔
421
            raise last_exception
1✔
422

423
        # This should never happen, but just in case
424
        raise httpx.RequestError("Failed to get response after retries", request=None)
×
425

426
    def _get_content_type(self, response: httpx.Response):
1✔
427
        """
428
        Get the content type of the response.
429

430
        :param response: The response object.
431
        :returns: The content type of the response.
432
        """
433
        content_type = response.headers.get("Content-Type", "")
1✔
434
        return content_type.split(";")[0]
1✔
435

436
    def _resolve_handler(self, content_type: str) -> Callable[[httpx.Response], ByteStream]:
1✔
437
        """
438
        Resolves the handler for the given content type.
439

440
        First, it tries to find a direct match for the content type in the handlers dictionary.
441
        If no direct match is found, it tries to find a pattern match using the fnmatch function.
442
        If no pattern match is found, it returns the default handler for text/plain.
443

444
        :param content_type: The content type to resolve the handler for.
445
        :returns: The handler for the given content type, if found. Otherwise, the default handler for text/plain.
446
        """
447
        # direct match
448
        if content_type in self.handlers:
1✔
449
            return self.handlers[content_type]
1✔
450

451
        # pattern matches
452
        for pattern, handler in self.handlers.items():
1✔
453
            if fnmatch(content_type, pattern):
1✔
454
                return handler
1✔
455

456
        # default handler
457
        return self.handlers["text/plain"]
×
458

459
    def _switch_user_agent(self, retry_state: Optional[RetryCallState] = None) -> None:
1✔
460
        """
461
        Switches the User-Agent for this LinkContentRetriever to the next one in the list of user agents.
462

463
        Used by tenacity to retry the requests with a different user agent.
464

465
        :param retry_state: The retry state (unused, required by tenacity).
466
        """
467
        self.current_user_agent_idx = (self.current_user_agent_idx + 1) % len(self.user_agents)
1✔
468
        logger.debug("Switched user agent to {user_agent}", user_agent=self.user_agents[self.current_user_agent_idx])
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc