• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

deepset-ai / haystack / 15131674881

20 May 2025 07:35AM UTC coverage: 90.156% (-0.3%) from 90.471%
15131674881

Pull #9407

github

web-flow
Merge b382eca10 into 6ad23f822
Pull Request #9407: feat: stream `ToolResult` from run_async in Agent

10972 of 12170 relevant lines covered (90.16%)

0.9 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.31
haystack/components/fetchers/link_content.py
1
# SPDX-FileCopyrightText: 2022-present deepset GmbH <info@deepset.ai>
2
#
3
# SPDX-License-Identifier: Apache-2.0
4

5
import asyncio
1✔
6
from collections import defaultdict
1✔
7
from concurrent.futures import ThreadPoolExecutor
1✔
8
from fnmatch import fnmatch
1✔
9
from typing import Callable, Dict, List, Optional, Tuple, cast
1✔
10

11
import httpx
1✔
12
from tenacity import RetryCallState, retry, retry_if_exception_type, stop_after_attempt, wait_exponential
1✔
13

14
from haystack import component, logging
1✔
15
from haystack.dataclasses import ByteStream
1✔
16
from haystack.lazy_imports import LazyImport
1✔
17
from haystack.version import __version__
1✔
18

19
# HTTP/2 support via lazy import
20
with LazyImport("Run 'pip install httpx[http2]' to use HTTP/2 support") as h2_import:
1✔
21
    pass  # nothing to import as we simply set the http2 attribute, library handles the rest
1✔
22

23
logger = logging.getLogger(__name__)
1✔
24

25

26
DEFAULT_USER_AGENT = f"haystack/LinkContentFetcher/{__version__}"
1✔
27

28
REQUEST_HEADERS = {
1✔
29
    "accept": "*/*",
30
    "User-Agent": DEFAULT_USER_AGENT,
31
    "Accept-Language": "en-US,en;q=0.9,it;q=0.8,es;q=0.7",
32
    "referer": "https://www.google.com/",
33
}
34

35

36
def _text_content_handler(response: httpx.Response) -> ByteStream:
1✔
37
    """
38
    Handles text content.
39

40
    :param response: Response object from the request.
41
    :returns: The extracted text.
42
    """
43
    return ByteStream.from_string(response.text)
1✔
44

45

46
def _binary_content_handler(response: httpx.Response) -> ByteStream:
1✔
47
    """
48
    Handles binary content.
49

50
    :param response: Response object from the request.
51
    :returns: The extracted binary file-like object.
52
    """
53
    return ByteStream(data=response.content)
1✔
54

55

56
@component
1✔
57
class LinkContentFetcher:
1✔
58
    """
59
    Fetches and extracts content from URLs.
60

61
    It supports various content types, retries on failures, and automatic user-agent rotation for failed web
62
    requests. Use it as the data-fetching step in your pipelines.
63

64
    You may need to convert LinkContentFetcher's output into a list of documents. Use HTMLToDocument
65
    converter to do this.
66

67
    ### Usage example
68

69
    ```python
70
    from haystack.components.fetchers.link_content import LinkContentFetcher
71

72
    fetcher = LinkContentFetcher()
73
    streams = fetcher.run(urls=["https://www.google.com"])["streams"]
74

75
    assert len(streams) == 1
76
    assert streams[0].meta == {'content_type': 'text/html', 'url': 'https://www.google.com'}
77
    assert streams[0].data
78
    ```
79

80
    For async usage:
81

82
    ```python
83
    import asyncio
84
    from haystack.components.fetchers import LinkContentFetcher
85

86
    async def fetch_async():
87
        fetcher = LinkContentFetcher()
88
        result = await fetcher.run_async(urls=["https://www.google.com"])
89
        return result["streams"]
90

91
    streams = asyncio.run(fetch_async())
92
    ```
93
    """
94

95
    def __init__(  # pylint: disable=too-many-positional-arguments
1✔
96
        self,
97
        raise_on_failure: bool = True,
98
        user_agents: Optional[List[str]] = None,
99
        retry_attempts: int = 2,
100
        timeout: int = 3,
101
        http2: bool = False,
102
        client_kwargs: Optional[Dict] = None,
103
    ):
104
        """
105
        Initializes the component.
106

107
        :param raise_on_failure: If `True`, raises an exception if it fails to fetch a single URL.
108
            For multiple URLs, it logs errors and returns the content it successfully fetched.
109
        :param user_agents: [User agents](https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/User-Agent)
110
            for fetching content. If `None`, a default user agent is used.
111
        :param retry_attempts: The number of times to retry to fetch the URL's content.
112
        :param timeout: Timeout in seconds for the request.
113
        :param http2: Whether to enable HTTP/2 support for requests. Defaults to False.
114
                     Requires the 'h2' package to be installed (via `pip install httpx[http2]`).
115
        :param client_kwargs: Additional keyword arguments to pass to the httpx client.
116
                     If `None`, default values are used.
117
        """
118
        self.raise_on_failure = raise_on_failure
1✔
119
        self.user_agents = user_agents or [DEFAULT_USER_AGENT]
1✔
120
        self.current_user_agent_idx: int = 0
1✔
121
        self.retry_attempts = retry_attempts
1✔
122
        self.timeout = timeout
1✔
123
        self.http2 = http2
1✔
124
        self.client_kwargs = client_kwargs or {}
1✔
125

126
        # Configure default client settings
127
        self.client_kwargs.setdefault("timeout", timeout)
1✔
128
        self.client_kwargs.setdefault("follow_redirects", True)
1✔
129

130
        # Create httpx clients
131
        client_kwargs = {**self.client_kwargs}
1✔
132

133
        # Optional HTTP/2 support
134
        if http2:
1✔
135
            try:
1✔
136
                h2_import.check()
1✔
137
                client_kwargs["http2"] = True
1✔
138
            except ImportError:
×
139
                logger.warning(
×
140
                    "HTTP/2 support requested but 'h2' package is not installed. "
141
                    "Falling back to HTTP/1.1. Install with `pip install httpx[http2]` to enable HTTP/2 support."
142
                )
143
                self.http2 = False  # Update the setting to match actual capability
×
144

145
        # Initialize synchronous client
146
        self._client = httpx.Client(**client_kwargs)
1✔
147

148
        # Initialize asynchronous client
149
        self._async_client = httpx.AsyncClient(**client_kwargs)
1✔
150

151
        # register default content handlers that extract data from the response
152
        self.handlers: Dict[str, Callable[[httpx.Response], ByteStream]] = defaultdict(lambda: _text_content_handler)
1✔
153
        self.handlers["text/*"] = _text_content_handler
1✔
154
        self.handlers["text/html"] = _binary_content_handler
1✔
155
        self.handlers["application/json"] = _text_content_handler
1✔
156
        self.handlers["application/*"] = _binary_content_handler
1✔
157
        self.handlers["image/*"] = _binary_content_handler
1✔
158
        self.handlers["audio/*"] = _binary_content_handler
1✔
159
        self.handlers["video/*"] = _binary_content_handler
1✔
160

161
        @retry(
1✔
162
            reraise=True,
163
            stop=stop_after_attempt(self.retry_attempts),
164
            wait=wait_exponential(multiplier=1, min=2, max=10),
165
            retry=(retry_if_exception_type((httpx.HTTPStatusError, httpx.RequestError))),
166
            # This method is invoked only after failed requests (exception raised)
167
            after=self._switch_user_agent,
168
        )
169
        def get_response(url):
1✔
170
            # we need to copy because we modify the headers
171
            headers = REQUEST_HEADERS.copy()
1✔
172
            headers["User-Agent"] = self.user_agents[self.current_user_agent_idx]
1✔
173
            response = self._client.get(url, headers=headers)
1✔
174
            response.raise_for_status()
1✔
175
            return response
1✔
176

177
        self._get_response: Callable = get_response
1✔
178

179
    def __del__(self):
1✔
180
        """
181
        Clean up resources when the component is deleted.
182

183
        Closes both the synchronous and asynchronous HTTP clients to prevent
184
        resource leaks.
185
        """
186
        try:
1✔
187
            # Close the synchronous client if it exists
188
            if hasattr(self, "_client"):
1✔
189
                self._client.close()
1✔
190

191
            # There is no way to close the async client without await
192
        except Exception:
×
193
            # Suppress any exceptions during cleanup
194
            pass
×
195

196
    @component.output_types(streams=List[ByteStream])
1✔
197
    def run(self, urls: List[str]):
1✔
198
        """
199
        Fetches content from a list of URLs and returns a list of extracted content streams.
200

201
        Each content stream is a `ByteStream` object containing the extracted content as binary data.
202
        Each ByteStream object in the returned list corresponds to the contents of a single URL.
203
        The content type of each stream is stored in the metadata of the ByteStream object under
204
        the key "content_type". The URL of the fetched content is stored under the key "url".
205

206
        :param urls: A list of URLs to fetch content from.
207
        :returns: `ByteStream` objects representing the extracted content.
208

209
        :raises Exception: If the provided list of URLs contains only a single URL, and `raise_on_failure` is set to
210
            `True`, an exception will be raised in case of an error during content retrieval.
211
            In all other scenarios, any retrieval errors are logged, and a list of successfully retrieved `ByteStream`
212
             objects is returned.
213
        """
214
        streams: List[ByteStream] = []
1✔
215
        if not urls:
1✔
216
            return {"streams": streams}
×
217

218
        # don't use multithreading if there's only one URL
219
        if len(urls) == 1:
1✔
220
            stream_metadata, stream = self._fetch(urls[0])
1✔
221
            stream.meta.update(stream_metadata)
1✔
222
            stream.mime_type = stream.meta.get("content_type", None)
1✔
223
            streams.append(stream)
1✔
224
        else:
225
            with ThreadPoolExecutor() as executor:
×
226
                results = executor.map(self._fetch_with_exception_suppression, urls)
×
227

228
            for stream_metadata, stream in results:  # type: ignore
×
229
                if stream_metadata is not None and stream is not None:
×
230
                    stream.meta.update(stream_metadata)
×
231
                    stream.mime_type = stream.meta.get("content_type", None)
×
232
                    streams.append(stream)
×
233

234
        return {"streams": streams}
1✔
235

236
    @component.output_types(streams=List[ByteStream])
1✔
237
    async def run_async(self, urls: List[str]):
1✔
238
        """
239
        Asynchronously fetches content from a list of URLs and returns a list of extracted content streams.
240

241
        This is the asynchronous version of the `run` method with the same parameters and return values.
242

243
        :param urls: A list of URLs to fetch content from.
244
        :returns: `ByteStream` objects representing the extracted content.
245
        """
246
        streams: List[ByteStream] = []
1✔
247
        if not urls:
1✔
248
            return {"streams": streams}
1✔
249

250
        # Create tasks for all URLs using _fetch_async directly
251
        tasks = [self._fetch_async(url, self._async_client) for url in urls]
1✔
252

253
        # Only capture exceptions when we have multiple URLs or raise_on_failure=False
254
        # This ensures errors propagate appropriately for single URLs with raise_on_failure=True
255
        return_exceptions = not (len(urls) == 1 and self.raise_on_failure)
1✔
256
        results = await asyncio.gather(*tasks, return_exceptions=return_exceptions)
1✔
257

258
        # Process results
259
        for i, result in enumerate(results):
1✔
260
            # Handle exception results (only happens when return_exceptions=True)
261
            if isinstance(result, Exception):
1✔
262
                logger.warning("Error fetching {url}: {error}", url=urls[i], error=str(result))
×
263
                # Add an empty result for failed URLs when raise_on_failure=False
264
                if not self.raise_on_failure:
×
265
                    streams.append(ByteStream(data=b"", meta={"content_type": "Unknown", "url": urls[i]}))
×
266
                continue
×
267

268
            # Process successful results
269
            # At this point, result is not an exception, so we need to cast it to the correct type for mypy
270
            if not isinstance(result, Exception):  # Runtime check
1✔
271
                # Use cast to tell mypy that result is the tuple type returned by _fetch_async
272
                result_tuple = cast(Tuple[Optional[Dict[str, str]], Optional[ByteStream]], result)
1✔
273
                stream_metadata, stream = result_tuple
1✔
274
                if stream_metadata is not None and stream is not None:
1✔
275
                    stream.meta.update(stream_metadata)
1✔
276
                    stream.mime_type = stream.meta.get("content_type", None)
1✔
277
                    streams.append(stream)
1✔
278

279
        return {"streams": streams}
1✔
280

281
    def _fetch(self, url: str) -> Tuple[Dict[str, str], ByteStream]:
1✔
282
        """
283
        Fetches content from a URL and returns it as a ByteStream.
284

285
        :param url: The URL to fetch content from.
286
        :returns: A tuple containing the ByteStream metadata dict and the corresponding ByteStream.
287
             ByteStream metadata contains the URL and the content type of the fetched content.
288
             The content type is a string indicating the type of content fetched (for example, "text/html",
289
             "application/pdf"). The ByteStream object contains the fetched content as binary data.
290

291
        :raises: If an error occurs during content retrieval and `raise_on_failure` is set to True, this method will
292
        raise an exception. Otherwise, all fetching errors are logged, and an empty ByteStream is returned.
293

294
        """
295
        content_type: str = "text/html"
1✔
296
        stream: ByteStream = ByteStream(data=b"")
1✔
297
        try:
1✔
298
            response = self._get_response(url)
1✔
299
            content_type = self._get_content_type(response)
1✔
300
            handler: Callable = self._resolve_handler(content_type)
1✔
301
            stream = handler(response)
1✔
302
        except Exception as e:
1✔
303
            if self.raise_on_failure:
1✔
304
                raise e
1✔
305
            # less verbose log as this is expected to happen often (requests failing, blocked, etc.)
306
            logger.debug("Couldn't retrieve content from {url} because {error}", url=url, error=str(e))
1✔
307

308
        finally:
309
            self.current_user_agent_idx = 0
1✔
310

311
        return {"content_type": content_type, "url": url}, stream
1✔
312

313
    async def _fetch_async(
1✔
314
        self, url: str, client: httpx.AsyncClient
315
    ) -> Tuple[Optional[Dict[str, str]], Optional[ByteStream]]:
316
        """
317
        Asynchronously fetches content from a URL and returns it as a ByteStream.
318

319
        :param url: The URL to fetch content from.
320
        :param client: The async httpx client to use for making requests.
321
        :returns: A tuple containing the ByteStream metadata dict and the corresponding ByteStream.
322
        """
323
        content_type: str = "text/html"
1✔
324
        stream: Optional[ByteStream] = None
1✔
325
        metadata: Optional[Dict[str, str]] = None
1✔
326

327
        try:
1✔
328
            response = await self._get_response_async(url, client)
1✔
329
            content_type = self._get_content_type(response)
1✔
330
            handler: Callable = self._resolve_handler(content_type)
1✔
331
            stream = handler(response)
1✔
332
            metadata = {"content_type": content_type, "url": url}
1✔
333
        except Exception as e:
1✔
334
            if self.raise_on_failure:
1✔
335
                raise e
1✔
336
            logger.debug("Couldn't retrieve content from {url} because {error}", url=url, error=str(e))
1✔
337
            # Create an empty ByteStream for failed requests when raise_on_failure is False
338
            stream = ByteStream(data=b"")
1✔
339
            metadata = {"content_type": content_type, "url": url}
1✔
340
        finally:
341
            self.current_user_agent_idx = 0
1✔
342

343
        return metadata, stream
1✔
344

345
    def _fetch_with_exception_suppression(self, url: str) -> Tuple[Optional[Dict[str, str]], Optional[ByteStream]]:
1✔
346
        """
347
        Fetches content from a URL and returns it as a ByteStream.
348

349
        If `raise_on_failure` is set to True, this method will wrap the fetch() method and catch any exceptions.
350
        Otherwise, it will simply call the fetch() method.
351
        :param url: The URL to fetch content from.
352
        :returns: A tuple containing the ByteStream metadata dict and the corresponding ByteStream.
353

354
        """
355
        if self.raise_on_failure:
×
356
            try:
×
357
                return self._fetch(url)
×
358
            except Exception as e:
×
359
                logger.warning("Error fetching {url}: {error}", url=url, error=str(e))
×
360
                return {"content_type": "Unknown", "url": url}, None
×
361
        else:
362
            return self._fetch(url)
×
363

364
    async def _get_response_async(self, url: str, client: httpx.AsyncClient) -> httpx.Response:
1✔
365
        """
366
        Asynchronously gets a response from a URL with retry logic.
367

368
        :param url: The URL to fetch.
369
        :param client: The async httpx client to use for making requests.
370
        :returns: The httpx Response object.
371
        """
372
        attempt = 0
1✔
373
        last_exception = None
1✔
374

375
        while attempt <= self.retry_attempts:
1✔
376
            try:
1✔
377
                headers = REQUEST_HEADERS.copy()
1✔
378
                headers["User-Agent"] = self.user_agents[self.current_user_agent_idx]
1✔
379
                response = await client.get(url, headers=headers)
1✔
380
                response.raise_for_status()
1✔
381
                return response
1✔
382
            except (httpx.HTTPStatusError, httpx.RequestError) as e:
1✔
383
                last_exception = e
1✔
384
                attempt += 1
1✔
385
                if attempt <= self.retry_attempts:
1✔
386
                    self._switch_user_agent(None)  # Switch user agent for next retry
1✔
387
                    # Wait before retry using exponential backoff
388
                    await asyncio.sleep(min(2 * 2 ** (attempt - 1), 10))
1✔
389
                else:
390
                    break
1✔
391

392
        # If we've exhausted all retries, raise the last exception
393
        if last_exception:
1✔
394
            raise last_exception
1✔
395

396
        # This should never happen, but just in case
397
        raise httpx.RequestError("Failed to get response after retries", request=None)
×
398

399
    def _get_content_type(self, response: httpx.Response):
1✔
400
        """
401
        Get the content type of the response.
402

403
        :param response: The response object.
404
        :returns: The content type of the response.
405
        """
406
        content_type = response.headers.get("Content-Type", "")
1✔
407
        return content_type.split(";")[0]
1✔
408

409
    def _resolve_handler(self, content_type: str) -> Callable[[httpx.Response], ByteStream]:
1✔
410
        """
411
        Resolves the handler for the given content type.
412

413
        First, it tries to find a direct match for the content type in the handlers dictionary.
414
        If no direct match is found, it tries to find a pattern match using the fnmatch function.
415
        If no pattern match is found, it returns the default handler for text/plain.
416

417
        :param content_type: The content type to resolve the handler for.
418
        :returns: The handler for the given content type, if found. Otherwise, the default handler for text/plain.
419
        """
420
        # direct match
421
        if content_type in self.handlers:
1✔
422
            return self.handlers[content_type]
1✔
423

424
        # pattern matches
425
        for pattern, handler in self.handlers.items():
1✔
426
            if fnmatch(content_type, pattern):
1✔
427
                return handler
1✔
428

429
        # default handler
430
        return self.handlers["text/plain"]
×
431

432
    def _switch_user_agent(self, retry_state: Optional[RetryCallState] = None) -> None:
1✔
433
        """
434
        Switches the User-Agent for this LinkContentRetriever to the next one in the list of user agents.
435

436
        Used by tenacity to retry the requests with a different user agent.
437

438
        :param retry_state: The retry state (unused, required by tenacity).
439
        """
440
        self.current_user_agent_idx = (self.current_user_agent_idx + 1) % len(self.user_agents)
1✔
441
        logger.debug("Switched user agent to {user_agent}", user_agent=self.user_agents[self.current_user_agent_idx])
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc