• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

deepset-ai / haystack / 15254528833

26 May 2025 12:56PM UTC coverage: 90.146% (-0.3%) from 90.411%
15254528833

Pull #9426

github

web-flow
Merge 06c2b66b1 into 802328e29
Pull Request #9426: feat: add component name and type to `StreamingChunk`

11398 of 12644 relevant lines covered (90.15%)

0.9 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.33
haystack/logging.py
1
# SPDX-FileCopyrightText: 2022-present deepset GmbH <info@deepset.ai>
2
#
3
# SPDX-License-Identifier: Apache-2.0
4

5
import builtins
1✔
6
import functools
1✔
7
import logging
1✔
8
import os
1✔
9
import sys
1✔
10
import typing
1✔
11
from typing import Any, List, Optional
1✔
12

13
if typing.TYPE_CHECKING:
1✔
14
    from structlog.typing import EventDict, Processor, WrappedLogger
×
15

16
HAYSTACK_LOGGING_USE_JSON_ENV_VAR = "HAYSTACK_LOGGING_USE_JSON"
1✔
17
HAYSTACK_LOGGING_IGNORE_STRUCTLOG_ENV_VAR = "HAYSTACK_LOGGING_IGNORE_STRUCTLOG"
1✔
18

19

20
class PatchedLogger(typing.Protocol):
1✔
21
    """Class which enables using type checkers to find wrong logger usage."""
22

23
    def debug(
1✔
24
        self,
25
        msg: str,
26
        *,
27
        _: Any = None,
28
        exc_info: Any = None,
29
        stack_info: Any = False,
30
        stacklevel: int = 1,
31
        **kwargs: Any,
32
    ) -> None:
33
        """Log a debug message."""
34

35
    def info(
1✔
36
        self,
37
        msg: str,
38
        *,
39
        _: Any = None,
40
        exc_info: Any = None,
41
        stack_info: Any = False,
42
        stacklevel: int = 1,
43
        **kwargs: Any,
44
    ) -> None:
45
        """Log an info message."""
46

47
    def warn(
1✔
48
        self,
49
        msg: str,
50
        *,
51
        _: Any = None,
52
        exc_info: Any = None,
53
        stack_info: Any = False,
54
        stacklevel: int = 1,
55
        **kwargs: Any,
56
    ) -> None:
57
        """Log a warning message."""
58

59
    def warning(
1✔
60
        self,
61
        msg: str,
62
        *,
63
        _: Any = None,
64
        exc_info: Any = None,
65
        stack_info: Any = False,
66
        stacklevel: int = 1,
67
        **kwargs: Any,
68
    ) -> None:
69
        """Log a warning message."""
70

71
    def error(
1✔
72
        self,
73
        msg: str,
74
        *,
75
        _: Any = None,
76
        exc_info: Any = None,
77
        stack_info: Any = False,
78
        stacklevel: int = 1,
79
        **kwargs: Any,
80
    ) -> None:
81
        """Log an error message."""
82

83
    def critical(
1✔
84
        self,
85
        msg: str,
86
        *,
87
        _: Any = None,
88
        exc_info: Any = None,
89
        stack_info: Any = False,
90
        stacklevel: int = 1,
91
        **kwargs: Any,
92
    ) -> None:
93
        """Log a critical message."""
94

95
    def exception(
1✔
96
        self,
97
        msg: str,
98
        *,
99
        _: Any = None,
100
        exc_info: Any = None,
101
        stack_info: Any = False,
102
        stacklevel: int = 1,
103
        **kwargs: Any,
104
    ) -> None:
105
        """Log an exception message."""
106

107
    def fatal(
1✔
108
        self,
109
        msg: str,
110
        *,
111
        _: Any = None,
112
        exc_info: Any = None,
113
        stack_info: Any = False,
114
        stacklevel: int = 1,
115
        **kwargs: Any,
116
    ) -> None:
117
        """Log a fatal message."""
118

119
    def log(
1✔
120
        self,
121
        level: int,
122
        msg: str,
123
        *,
124
        _: Any = None,
125
        exc_info: Any = None,
126
        stack_info: Any = False,
127
        stacklevel: int = 1,
128
        **kwargs: Any,
129
    ) -> None:
130
        """Log a message."""
131

132
    def setLevel(self, level: int) -> None:
1✔
133
        """Set the logging level."""
134

135

136
def patch_log_method_to_kwargs_only(func: typing.Callable) -> typing.Callable:
1✔
137
    """A decorator to make sure that a function is only called with keyword arguments."""
138

139
    @functools.wraps(func)
1✔
140
    def _log_only_with_kwargs(
1✔
141
        msg: str, *, _: Any = None, exc_info: Any = None, stack_info: Any = False, stacklevel: int = 1, **kwargs: Any
142
    ) -> typing.Callable:  # we need the `_` to avoid a syntax error
143
        existing_extra = kwargs.pop("extra", {})
1✔
144
        return func(
1✔
145
            # we need to increase the stacklevel by 1 to point to the correct caller
146
            # (otherwise it points to this function)
147
            msg,
148
            exc_info=exc_info,
149
            stack_info=stack_info,
150
            stacklevel=stacklevel + 1,
151
            extra={**existing_extra, **kwargs},
152
        )
153

154
    return _log_only_with_kwargs
1✔
155

156

157
def patch_log_with_level_method_to_kwargs_only(func: typing.Callable) -> typing.Callable:
1✔
158
    """A decorator to make sure that a function is only called with keyword arguments."""
159

160
    @functools.wraps(func)
1✔
161
    def _log_only_with_kwargs(
1✔
162
        level: typing.Union[int, str],
163
        msg: str,
164
        *,
165
        _: Any = None,
166
        exc_info: Any = None,
167
        stack_info: Any = False,
168
        stacklevel: int = 1,
169
        **kwargs: Any,  # we need the `_` to avoid a syntax error
170
    ) -> typing.Callable:
171
        existing_extra = kwargs.pop("extra", {})
1✔
172

173
        return func(
1✔
174
            level,
175
            msg,
176
            exc_info=exc_info,
177
            stack_info=stack_info,
178
            # we need to increase the stacklevel by 1 to point to the correct caller
179
            # (otherwise it points to this function)
180
            stacklevel=stacklevel + 1,
181
            extra={**existing_extra, **kwargs},
182
        )
183

184
    return _log_only_with_kwargs
1✔
185

186

187
def patch_make_records_to_use_kwarg_string_interpolation(original_make_records: typing.Callable) -> typing.Callable:
1✔
188
    """A decorator to ensure string interpolation is used."""
189

190
    @functools.wraps(original_make_records)
1✔
191
    def _wrapper(  # pylint: disable=too-many-positional-arguments
1✔
192
        name: str,
193
        level: typing.Union[int, str],
194
        fn: str,
195
        lno: int,
196
        msg: str,
197
        args: Any,
198
        exc_info: Any,
199
        func: Any = None,
200
        extra: Any = None,
201
        sinfo: Any = None,
202
    ) -> typing.Callable:
203
        safe_extra = extra or {}
1✔
204
        try:
1✔
205
            interpolated_msg = msg.format(**safe_extra)
1✔
206
        except (KeyError, ValueError, IndexError):
1✔
207
            interpolated_msg = msg
1✔
208
        return original_make_records(name, level, fn, lno, interpolated_msg, (), exc_info, func, extra, sinfo)
1✔
209

210
    return _wrapper
1✔
211

212

213
def _patch_structlog_call_information(logger: logging.Logger) -> None:
1✔
214
    # structlog patches the findCaller to hide itself from the traceback.
215
    # We need to patch their patch to hide `haystack.logging` from the traceback.
216
    try:
1✔
217
        from structlog._frames import _find_first_app_frame_and_name, _format_stack
1✔
218
        from structlog.stdlib import _FixedFindCallerLogger
1✔
219

220
        if not isinstance(logger, _FixedFindCallerLogger):
1✔
221
            return
1✔
222

223
        # completely copied from structlog. We only add `haystack.logging` to the list of ignored frames
224
        # pylint: disable=unused-variable
225
        def findCaller(stack_info: bool = False, stacklevel: int = 1) -> typing.Tuple[str, int, str, Optional[str]]:
1✔
226
            try:
1✔
227
                sinfo: Optional[str]
228
                # we need to exclude `haystack.logging` from the stack
229
                f, name = _find_first_app_frame_and_name(["logging", "haystack.logging"])
1✔
230
                sinfo = _format_stack(f) if stack_info else None
1✔
231
            except Exception as error:
×
232
                print(f"Error in findCaller: {error}")
×
233

234
            return f.f_code.co_filename, f.f_lineno, f.f_code.co_name, sinfo
1✔
235

236
        logger.findCaller = findCaller  # type: ignore
1✔
237
    except ImportError:
×
238
        pass
×
239

240

241
def getLogger(name: str) -> PatchedLogger:
1✔
242
    """
243
    Get the Haystack logger, a patched version of the one from the standard library.
244

245
    We patch the default logger methods to make sure that they are only called with keyword arguments.
246
    We enforce keyword-arguments because
247
        - it brings in consistency
248
        - it makes structure logging effective, not just an available feature
249
    """
250
    logger = logging.getLogger(name)
1✔
251
    logger.debug = patch_log_method_to_kwargs_only(logger.debug)  # type: ignore
1✔
252
    logger.info = patch_log_method_to_kwargs_only(logger.info)  # type: ignore
1✔
253
    logger.warn = patch_log_method_to_kwargs_only(logger.warn)  # type: ignore
1✔
254
    logger.warning = patch_log_method_to_kwargs_only(logger.warning)  # type: ignore
1✔
255
    logger.error = patch_log_method_to_kwargs_only(logger.error)  # type: ignore
1✔
256
    logger.critical = patch_log_method_to_kwargs_only(logger.critical)  # type: ignore
1✔
257
    logger.exception = patch_log_method_to_kwargs_only(logger.exception)  # type: ignore
1✔
258
    logger.fatal = patch_log_method_to_kwargs_only(logger.fatal)  # type: ignore
1✔
259
    logger.log = patch_log_with_level_method_to_kwargs_only(logger.log)  # type: ignore
1✔
260

261
    _patch_structlog_call_information(logger)
1✔
262

263
    # We also patch the `makeRecord` method to use keyword string interpolation
264
    logger.makeRecord = patch_make_records_to_use_kwarg_string_interpolation(logger.makeRecord)  # type: ignore
1✔
265

266
    return typing.cast(PatchedLogger, logger)
1✔
267

268

269
def add_line_and_file(_: "WrappedLogger", __: str, event_dict: "EventDict") -> "EventDict":
1✔
270
    """Add line and file to log entries."""
271
    stdlib_record = event_dict.get("_record")
1✔
272
    if not stdlib_record:
1✔
273
        return event_dict
×
274

275
    event_dict["lineno"] = stdlib_record.lineno
1✔
276
    event_dict["module"] = stdlib_record.name
1✔
277

278
    return event_dict
1✔
279

280

281
def correlate_logs_with_traces(_: "WrappedLogger", __: str, event_dict: "EventDict") -> "EventDict":
1✔
282
    """
283
    Add correlation data for logs.
284

285
    This is useful if you want to correlate logs with traces.
286
    """
287
    import haystack.tracing.tracer  # to avoid circular imports
1✔
288

289
    if not haystack.tracing.is_tracing_enabled():
1✔
290
        return event_dict
1✔
291

292
    current_span = haystack.tracing.tracer.current_span()
1✔
293
    if current_span:
1✔
294
        event_dict.update(current_span.get_correlation_data_for_logs())
1✔
295

296
    return event_dict
1✔
297

298

299
def configure_logging(use_json: Optional[bool] = None) -> None:
1✔
300
    """
301
    Configure logging for Haystack.
302

303
    - If `structlog` is not installed, we keep everything as it is. The user is responsible for configuring logging
304
      themselves.
305
    - If `structlog` is installed, we configure it to format log entries including its key-value data. To disable this
306
      behavior set the environment variable `HAYSTACK_LOGGING_IGNORE_STRUCTLOG` to `true`.
307
    - If `structlog` is installed, you can JSON format all logs. Enable this by
308
        - setting the `use_json` parameter to `True` when calling this function
309
        - setting the environment variable `HAYSTACK_LOGGING_USE_JSON` to `true`
310
    """
311
    import haystack.utils.jupyter  # to avoid circular imports
1✔
312

313
    try:
1✔
314
        import structlog
1✔
315
        from structlog.processors import ExceptionRenderer
1✔
316
        from structlog.tracebacks import ExceptionDictTransformer
1✔
317

318
    except ImportError:
×
319
        # structlog is not installed - fall back to standard logging
320
        return
×
321

322
    if os.getenv(HAYSTACK_LOGGING_IGNORE_STRUCTLOG_ENV_VAR, "false").lower() == "true":
1✔
323
        # If the user wants to ignore structlog, we don't configure it and fall back to standard logging
324
        return
1✔
325

326
    # We roughly follow the structlog documentation here:
327
    # https://www.structlog.org/en/stable/standard-library.html#rendering-using-structlog-based-formatters-within-logging
328
    # This means that we use structlog to format the log entries for entries emitted via `logging` and `structlog`.
329

330
    if use_json is None:  # explicit parameter takes precedence over everything else
1✔
331
        use_json_env_var = os.getenv(HAYSTACK_LOGGING_USE_JSON_ENV_VAR)
1✔
332
        if use_json_env_var is None:
1✔
333
            # We try to guess if we are in an interactive terminal or not
334
            interactive_terminal = (
1✔
335
                sys.stderr.isatty() or hasattr(builtins, "__IPYTHON__") or haystack.utils.jupyter.is_in_jupyter()
336
            )
337
            use_json = not interactive_terminal
1✔
338
        else:
339
            # User gave us an explicit value via environment variable
340
            use_json = use_json_env_var.lower() == "true"
1✔
341

342
    shared_processors: List[Processor] = [
1✔
343
        # Add the log level to the event_dict for structlog to use
344
        structlog.stdlib.add_log_level,
345
        # Adds the current timestamp in ISO format to logs
346
        structlog.processors.TimeStamper(fmt="iso"),
347
        structlog.contextvars.merge_contextvars,
348
        add_line_and_file,
349
    ]
350

351
    if use_json:
1✔
352
        # We only need that in sophisticated production setups where we want to correlate logs with traces
353
        shared_processors.append(correlate_logs_with_traces)
1✔
354

355
    structlog.configure(
1✔
356
        processors=shared_processors + [structlog.stdlib.ProcessorFormatter.wrap_for_formatter],
357
        logger_factory=structlog.stdlib.LoggerFactory(ignore_frame_names=["haystack.logging"]),
358
        cache_logger_on_first_use=True,
359
        # This is a filter that will filter out log entries that are below the log level of the root logger.
360
        wrapper_class=structlog.make_filtering_bound_logger(min_level=logging.root.getEffectiveLevel()),
361
    )
362

363
    renderers: List[Processor]
364
    if use_json:
1✔
365
        renderers = [
1✔
366
            ExceptionRenderer(
367
                # don't show locals in production logs - this can be quite sensitive information
368
                ExceptionDictTransformer(show_locals=False)
369
            ),
370
            structlog.processors.JSONRenderer(),
371
        ]
372
    else:
373
        renderers = [structlog.dev.ConsoleRenderer()]
1✔
374

375
    formatter = structlog.stdlib.ProcessorFormatter(
1✔
376
        # These run ONLY on `logging` entries that do NOT originate within
377
        # structlog.
378
        foreign_pre_chain=shared_processors
379
        + [
380
            # Add the information from the `logging` `extras` to the event dictionary
381
            structlog.stdlib.ExtraAdder()
382
        ],
383
        # These run on ALL entries after the pre_chain is done.
384
        processors=[
385
            # Remove _record & _from_structlog. to avoid that this metadata is added to the final log record
386
            structlog.stdlib.ProcessorFormatter.remove_processors_meta,
387
            *renderers,
388
        ],
389
    )
390

391
    handler = logging.StreamHandler()
1✔
392
    handler.name = "HaystackLoggingHandler"
1✔
393
    # Use OUR `ProcessorFormatter` to format all `logging` entries.
394
    handler.setFormatter(formatter)
1✔
395

396
    root_logger = logging.getLogger()
1✔
397
    # avoid adding our handler twice
398
    old_handlers = [
1✔
399
        h
400
        for h in root_logger.handlers
401
        if not (isinstance(h, logging.StreamHandler) and h.name == "HaystackLoggingHandler")
402
    ]
403
    new_handlers = [handler, *old_handlers]
1✔
404
    root_logger.handlers = new_handlers
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc