• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

deepset-ai / haystack / 19171927776

07 Nov 2025 02:49PM UTC coverage: 91.468% (-0.1%) from 91.578%
19171927776

Pull #10035

github

web-flow
Merge 4f2a5ebff into ea6ef8c94
Pull Request #10035: feat: return `logprobs` in `OpenAIChatGenerator` and `OpenAIResponsesChatGenerator`

13722 of 15002 relevant lines covered (91.47%)

0.91 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

52.56
haystack/components/generators/chat/openai_responses.py
1
# SPDX-FileCopyrightText: 2022-present deepset GmbH <info@deepset.ai>
2
#
3
# SPDX-License-Identifier: Apache-2.0
4

5
import json
1✔
6
import os
1✔
7
from datetime import datetime
1✔
8
from typing import Any, Optional, Union
1✔
9

10
from openai import AsyncOpenAI, AsyncStream, OpenAI, Stream
1✔
11
from openai.lib._pydantic import to_strict_json_schema
1✔
12
from openai.types.responses import ParsedResponse, Response, ResponseOutputRefusal, ResponseStreamEvent
1✔
13
from pydantic import BaseModel
1✔
14

15
from haystack import component, default_from_dict, default_to_dict, logging
1✔
16
from haystack.components.generators.utils import _serialize_object
1✔
17
from haystack.dataclasses import (
1✔
18
    AsyncStreamingCallbackT,
19
    ChatMessage,
20
    ComponentInfo,
21
    ImageContent,
22
    ReasoningContent,
23
    StreamingCallbackT,
24
    StreamingChunk,
25
    SyncStreamingCallbackT,
26
    TextContent,
27
    ToolCall,
28
    ToolCallDelta,
29
    select_streaming_callback,
30
)
31
from haystack.tools import (
1✔
32
    ToolsType,
33
    _check_duplicate_tool_names,
34
    deserialize_tools_or_toolset_inplace,
35
    flatten_tools_or_toolsets,
36
    serialize_tools_or_toolset,
37
    warm_up_tools,
38
)
39
from haystack.utils import Secret, deserialize_callable, deserialize_secrets_inplace, serialize_callable
1✔
40
from haystack.utils.http_client import init_http_client
1✔
41

42
logger = logging.getLogger(__name__)
1✔
43

44

45
@component
1✔
46
class OpenAIResponsesChatGenerator:
1✔
47
    """
48
    Completes chats using OpenAI's Responses API.
49

50
    It works with the gpt-4 and o-series models and supports streaming responses
51
    from OpenAI API. It uses [ChatMessage](https://docs.haystack.deepset.ai/docs/chatmessage)
52
    format in input and output.
53

54
    You can customize how the text is generated by passing parameters to the
55
    OpenAI API. Use the `**generation_kwargs` argument when you initialize
56
    the component or when you run it. Any parameter that works with
57
    `openai.Responses.create` will work here too.
58

59
    For details on OpenAI API parameters, see
60
    [OpenAI documentation](https://platform.openai.com/docs/api-reference/responses).
61

62
    ### Usage example
63

64
    ```python
65
    from haystack.components.generators.chat import OpenAIResponsesChatGenerator
66
    from haystack.dataclasses import ChatMessage
67

68
    messages = [ChatMessage.from_user("What's Natural Language Processing?")]
69

70
    client = OpenAIResponsesChatGenerator(generation_kwargs={"reasoning": {"effort": "low", "summary": "auto"}})
71
    response = client.run(messages)
72
    print(response)
73
    ```
74
    """
75

76
    def __init__(
1✔
77
        self,
78
        *,
79
        api_key: Secret = Secret.from_env_var("OPENAI_API_KEY"),
80
        model: str = "gpt-5-mini",
81
        streaming_callback: Optional[StreamingCallbackT] = None,
82
        api_base_url: Optional[str] = None,
83
        organization: Optional[str] = None,
84
        generation_kwargs: Optional[dict[str, Any]] = None,
85
        timeout: Optional[float] = None,
86
        max_retries: Optional[int] = None,
87
        tools: Optional[Union[ToolsType, list[dict]]] = None,
88
        tools_strict: bool = False,
89
        http_client_kwargs: Optional[dict[str, Any]] = None,
90
    ):
91
        """
92
        Creates an instance of OpenAIResponsesChatGenerator. Uses OpenAI's gpt-5-mini by default.
93

94
        Before initializing the component, you can set the 'OPENAI_TIMEOUT' and 'OPENAI_MAX_RETRIES'
95
        environment variables to override the `timeout` and `max_retries` parameters respectively
96
        in the OpenAI client.
97

98
        :param api_key: The OpenAI API key.
99
            You can set it with an environment variable `OPENAI_API_KEY`, or pass with this parameter
100
            during initialization.
101
        :param model: The name of the model to use.
102
        :param streaming_callback: A callback function that is called when a new token is received from the stream.
103
            The callback function accepts [StreamingChunk](https://docs.haystack.deepset.ai/docs/data-classes#streamingchunk)
104
            as an argument.
105
        :param api_base_url: An optional base URL.
106
        :param organization: Your organization ID, defaults to `None`. See
107
        [production best practices](https://platform.openai.com/docs/guides/production-best-practices/setting-up-your-organization).
108
        :param generation_kwargs: Other parameters to use for the model. These parameters are sent
109
           directly to the OpenAI endpoint.
110
           See OpenAI [documentation](https://platform.openai.com/docs/api-reference/responses) for
111
            more details.
112
            Some of the supported parameters:
113
            - `temperature`: What sampling temperature to use. Higher values like 0.8 will make the output more random,
114
                while lower values like 0.2 will make it more focused and deterministic.
115
            - `top_p`: An alternative to sampling with temperature, called nucleus sampling, where the model
116
                considers the results of the tokens with top_p probability mass. For example, 0.1 means only the tokens
117
                comprising the top 10% probability mass are considered.
118
            - `previous_response_id`: The ID of the previous response.
119
                Use this to create multi-turn conversations.
120
            - `text_format`: A JSON schema or a Pydantic model that enforces the structure of the model's response.
121
                If provided, the output will always be validated against this
122
                format (unless the model returns a tool call).
123
                For details, see the [OpenAI Structured Outputs documentation](https://platform.openai.com/docs/guides/structured-outputs).
124
                Notes:
125
                - This parameter accepts Pydantic models and JSON schemas for latest models starting from GPT-4o.
126
                  Older models only support basic version of structured outputs through `{"type": "json_object"}`.
127
                  For detailed information on JSON mode, see the [OpenAI Structured Outputs documentation](https://platform.openai.com/docs/guides/structured-outputs#json-mode).
128
                - For structured outputs with streaming,
129
                  the `text_format` must be a JSON schema and not a Pydantic model.
130
            - `reasoning`: A dictionary of parameters for reasoning. For example:
131
                - `summary`: The summary of the reasoning.
132
                - `effort`: The level of effort to put into the reasoning. Can be `low`, `medium` or `high`.
133
                - `generate_summary`: Whether to generate a summary of the reasoning.
134
                Note: OpenAI does not return the reasoning tokens, but we can view summary if its enabled.
135
                For details, see the [OpenAI Reasoning documentation](https://platform.openai.com/docs/guides/reasoning).
136
        :param timeout:
137
            Timeout for OpenAI client calls. If not set, it defaults to either the
138
            `OPENAI_TIMEOUT` environment variable, or 30 seconds.
139
        :param max_retries:
140
            Maximum number of retries to contact OpenAI after an internal error.
141
            If not set, it defaults to either the `OPENAI_MAX_RETRIES` environment variable, or set to 5.
142
        :param tools:
143
            The tools that the model can use to prepare calls. This parameter can accept either a
144
            mixed list of Haystack `Tool` objects and Haystack `Toolset`. Or you can pass a dictionary of
145
            OpenAI/MCP tool definitions.
146
            Note: You cannot pass OpenAI/MCP tools and Haystack tools together.
147
            For details on tool support, see [OpenAI documentation](https://platform.openai.com/docs/api-reference/responses/create#responses-create-tools).
148
        :param tools_strict:
149
            Whether to enable strict schema adherence for tool calls. If set to `False`, the model may not exactly
150
            follow the schema provided in the `parameters` field of the tool definition. In Response API, tool calls
151
            are strict by default.
152
        :param http_client_kwargs:
153
            A dictionary of keyword arguments to configure a custom `httpx.Client`or `httpx.AsyncClient`.
154
            For more information, see the [HTTPX documentation](https://www.python-httpx.org/api/#client).
155

156
        """
157
        self.api_key = api_key
1✔
158
        self.model = model
1✔
159
        self.generation_kwargs = generation_kwargs or {}
1✔
160
        self.streaming_callback = streaming_callback
1✔
161
        self.api_base_url = api_base_url
1✔
162
        self.organization = organization
1✔
163
        self.timeout = timeout
1✔
164
        self.max_retries = max_retries
1✔
165
        self.tools = tools  # Store tools as-is, whether it's a list or a Toolset
1✔
166
        self.tools_strict = tools_strict
1✔
167
        self.http_client_kwargs = http_client_kwargs
1✔
168

169
        if timeout is None:
1✔
170
            timeout = float(os.environ.get("OPENAI_TIMEOUT", "30.0"))
1✔
171
        if max_retries is None:
1✔
172
            max_retries = int(os.environ.get("OPENAI_MAX_RETRIES", "5"))
1✔
173

174
        resolved_api_key = api_key.resolve_value() if isinstance(api_key, Secret) else api_key
1✔
175
        client_kwargs: dict[str, Any] = {
1✔
176
            "api_key": resolved_api_key,
177
            "organization": organization,
178
            "base_url": api_base_url,
179
            "timeout": timeout,
180
            "max_retries": max_retries,
181
        }
182

183
        self.client = OpenAI(http_client=init_http_client(self.http_client_kwargs, async_client=False), **client_kwargs)
1✔
184
        self.async_client = AsyncOpenAI(
1✔
185
            http_client=init_http_client(self.http_client_kwargs, async_client=True), **client_kwargs
186
        )
187
        self._is_warmed_up = False
1✔
188

189
    def warm_up(self):
1✔
190
        """
191
        Warm up the OpenAI responses chat generator.
192

193
        This will warm up the tools registered in the chat generator.
194
        This method is idempotent and will only warm up the tools once.
195
        """
196
        if not self._is_warmed_up:
1✔
197
            is_openai_tool = isinstance(self.tools, list) and isinstance(self.tools[0], dict)
1✔
198
            # We only warm up Haystack tools, not OpenAI/MCP tools
199
            # The type ignore is needed because mypy cannot infer the type correctly
200
            if not is_openai_tool:
1✔
201
                warm_up_tools(self.tools)  # type: ignore[arg-type]
1✔
202
            self._is_warmed_up = True
1✔
203

204
    def _get_telemetry_data(self) -> dict[str, Any]:
1✔
205
        """
206
        Data that is sent to Posthog for usage analytics.
207
        """
208
        return {"model": self.model}
×
209

210
    def to_dict(self) -> dict[str, Any]:
1✔
211
        """
212
        Serialize this component to a dictionary.
213

214
        :returns:
215
            The serialized component as a dictionary.
216
        """
217
        callback_name = serialize_callable(self.streaming_callback) if self.streaming_callback else None
1✔
218
        generation_kwargs = self.generation_kwargs.copy()
1✔
219
        response_format = generation_kwargs.get("text_format")
1✔
220

221
        # If the response format is a Pydantic model, it's converted to openai's json schema format
222
        # If it's already a json schema, it's left as is
223
        if response_format and issubclass(response_format, BaseModel):
1✔
224
            json_schema = {
1✔
225
                "type": "json_schema",
226
                "json_schema": {
227
                    "name": response_format.__name__,
228
                    "strict": True,
229
                    "schema": to_strict_json_schema(response_format),
230
                },
231
            }
232
            generation_kwargs["text_format"] = json_schema
1✔
233

234
        # OpenAI/MCP tools are passed as list of dictionaries
235
        serialized_tools: Union[dict[str, Any], list[dict[str, Any]], None]
236
        if self.tools and isinstance(self.tools, list) and isinstance(self.tools[0], dict):
1✔
237
            # mypy can't infer that self.tools is list[dict] here
238
            serialized_tools = self.tools  # type: ignore[assignment]
×
239
        else:
240
            serialized_tools = serialize_tools_or_toolset(self.tools)  # type: ignore[arg-type]
1✔
241

242
        return default_to_dict(
1✔
243
            self,
244
            model=self.model,
245
            streaming_callback=callback_name,
246
            api_base_url=self.api_base_url,
247
            organization=self.organization,
248
            generation_kwargs=generation_kwargs,
249
            api_key=self.api_key.to_dict(),
250
            timeout=self.timeout,
251
            max_retries=self.max_retries,
252
            tools=serialized_tools,
253
            tools_strict=self.tools_strict,
254
            http_client_kwargs=self.http_client_kwargs,
255
        )
256

257
    @classmethod
1✔
258
    def from_dict(cls, data: dict[str, Any]) -> "OpenAIResponsesChatGenerator":
1✔
259
        """
260
        Deserialize this component from a dictionary.
261

262
        :param data: The dictionary representation of this component.
263
        :returns:
264
            The deserialized component instance.
265
        """
266
        deserialize_secrets_inplace(data["init_parameters"], keys=["api_key"])
1✔
267

268
        # we only deserialize the tools if they are haystack tools
269
        # because openai tools are not serialized in the same way
270

271
        tools = data["init_parameters"].get("tools")
1✔
272
        if tools and (
1✔
273
            isinstance(tools, dict)
274
            and tools.get("type") == "haystack.tools.toolset.Toolset"
275
            or isinstance(tools, list)
276
            and tools[0].get("type") == "haystack.tools.tool.Tool"
277
        ):
278
            deserialize_tools_or_toolset_inplace(data["init_parameters"], key="tools")
1✔
279

280
        init_params = data.get("init_parameters", {})
1✔
281
        serialized_callback_handler = init_params.get("streaming_callback")
1✔
282

283
        if serialized_callback_handler:
1✔
284
            data["init_parameters"]["streaming_callback"] = deserialize_callable(serialized_callback_handler)
1✔
285
        return default_from_dict(cls, data)
1✔
286

287
    @component.output_types(replies=list[ChatMessage])
1✔
288
    def run(
1✔
289
        self,
290
        messages: list[ChatMessage],
291
        *,
292
        streaming_callback: Optional[StreamingCallbackT] = None,
293
        generation_kwargs: Optional[dict[str, Any]] = None,
294
        tools: Optional[Union[ToolsType, list[dict]]] = None,
295
        tools_strict: Optional[bool] = None,
296
    ):
297
        """
298
        Invokes response generation based on the provided messages and generation parameters.
299

300
        :param messages:
301
            A list of ChatMessage instances representing the input messages.
302
        :param streaming_callback:
303
            A callback function that is called when a new token is received from the stream.
304
        :param generation_kwargs:
305
            Additional keyword arguments for text generation. These parameters will
306
            override the parameters passed during component initialization.
307
            For details on OpenAI API parameters, see [OpenAI documentation](https://platform.openai.com/docs/api-reference/responses/create).
308
        :param tools:
309
            The tools that the model can use to prepare calls. If set, it will override the
310
            `tools` parameter set during component initialization. This parameter can accept either a
311
            mixed list of Haystack `Tool` objects and Haystack `Toolset`. Or you can pass a dictionary of
312
            OpenAI/MCP tool definitions.
313
            Note: You cannot pass OpenAI/MCP tools and Haystack tools together.
314
            For details on tool support, see [OpenAI documentation](https://platform.openai.com/docs/api-reference/responses/create#responses-create-tools).
315
        :param tools_strict:
316
            Whether to enable strict schema adherence for tool calls. If set to `False`, the model may not exactly
317
            follow the schema provided in the `parameters` field of the tool definition. In Response API, tool calls
318
            are strict by default.
319
            If set, it will override the `tools_strict` parameter set during component initialization.
320

321
        :returns:
322
            A dictionary with the following key:
323
            - `replies`: A list containing the generated responses as ChatMessage instances.
324
        """
325
        if len(messages) == 0:
1✔
326
            return {"replies": []}
×
327

328
        streaming_callback = select_streaming_callback(
1✔
329
            init_callback=self.streaming_callback, runtime_callback=streaming_callback, requires_async=False
330
        )
331
        responses: Union[Stream[ResponseStreamEvent], Response]
332

333
        api_args = self._prepare_api_call(
1✔
334
            messages=messages,
335
            streaming_callback=streaming_callback,
336
            generation_kwargs=generation_kwargs,
337
            tools=tools,
338
            tools_strict=tools_strict,
339
        )
340
        openai_endpoint = api_args.pop("openai_endpoint")
1✔
341
        openai_endpoint_method = getattr(self.client.responses, openai_endpoint)
1✔
342
        responses = openai_endpoint_method(**api_args)
1✔
343

344
        if streaming_callback is not None:
×
345
            response_output = self._handle_stream_response(
×
346
                responses,  # type: ignore
347
                streaming_callback,
348
            )
349
        else:
350
            assert isinstance(responses, Response), "Unexpected response type for non-streaming request."
×
351
            response_output = [_convert_response_to_chat_message(responses)]
×
352
        return {"replies": response_output}
×
353

354
    @component.output_types(replies=list[ChatMessage])
1✔
355
    async def run_async(
1✔
356
        self,
357
        messages: list[ChatMessage],
358
        *,
359
        streaming_callback: Optional[StreamingCallbackT] = None,
360
        generation_kwargs: Optional[dict[str, Any]] = None,
361
        tools: Optional[Union[ToolsType, list[dict]]] = None,
362
        tools_strict: Optional[bool] = None,
363
    ):
364
        """
365
        Asynchronously invokes response generation based on the provided messages and generation parameters.
366

367
        This is the asynchronous version of the `run` method. It has the same parameters and return values
368
        but can be used with `await` in async code.
369

370
        :param messages:
371
            A list of ChatMessage instances representing the input messages.
372
        :param streaming_callback:
373
            A callback function that is called when a new token is received from the stream.
374
            Must be a coroutine.
375
        :param generation_kwargs:
376
            Additional keyword arguments for text generation. These parameters will
377
            override the parameters passed during component initialization.
378
            For details on OpenAI API parameters, see [OpenAI documentation](https://platform.openai.com/docs/api-reference/responses/create).
379
        :param tools:
380
            A list of tools or a Toolset for which the model can prepare calls. If set, it will override the
381
            `tools` parameter set during component initialization. This parameter can accept either a list of
382
            mixed list of Haystack `Tool` objects and Haystack `Toolset`. Or you can pass a dictionary of
383
            OpenAI/MCP tool definitions.
384
            Note: You cannot pass OpenAI/MCP tools and Haystack tools together.
385
        :param tools_strict:
386
            Whether to enable strict schema adherence for tool calls. If set to `True`, the model will follow exactly
387
            the schema provided in the `parameters` field of the tool definition, but this may increase latency.
388
            If set, it will override the `tools_strict` parameter set during component initialization.
389

390
        :returns:
391
            A dictionary with the following key:
392
            - `replies`: A list containing the generated responses as ChatMessage instances.
393
        """
394
        # validate and select the streaming callback
395
        streaming_callback = select_streaming_callback(
×
396
            init_callback=self.streaming_callback, runtime_callback=streaming_callback, requires_async=True
397
        )
398
        responses: Union[AsyncStream[ResponseStreamEvent], Response]
399

400
        if len(messages) == 0:
×
401
            return {"replies": []}
×
402

403
        api_args = self._prepare_api_call(
×
404
            messages=messages,
405
            streaming_callback=streaming_callback,
406
            generation_kwargs=generation_kwargs,
407
            tools=tools,
408
            tools_strict=tools_strict,
409
        )
410

411
        openai_endpoint = api_args.pop("openai_endpoint")
×
412
        openai_endpoint_method = getattr(self.async_client.responses, openai_endpoint)
×
413
        responses = await openai_endpoint_method(**api_args)
×
414

415
        if streaming_callback is not None:
×
416
            response_output = await self._handle_async_stream_response(
×
417
                responses,  # type: ignore
418
                streaming_callback,
419
            )
420

421
        else:
422
            assert isinstance(responses, Response), "Unexpected response type for non-streaming request."
×
423
            response_output = [_convert_response_to_chat_message(responses)]
×
424
        return {"replies": response_output}
×
425

426
    def _prepare_api_call(  # noqa: PLR0913
1✔
427
        self,
428
        *,
429
        messages: list[ChatMessage],
430
        streaming_callback: Optional[StreamingCallbackT] = None,
431
        generation_kwargs: Optional[dict[str, Any]] = None,
432
        tools: Optional[Union[ToolsType, list[dict]]] = None,
433
        tools_strict: Optional[bool] = None,
434
    ) -> dict[str, Any]:
435
        # update generation kwargs by merging with the generation kwargs passed to the run method
436
        generation_kwargs = {**self.generation_kwargs, **(generation_kwargs or {})}
1✔
437

438
        text_format = generation_kwargs.pop("text_format", None)
1✔
439

440
        # adapt ChatMessage(s) to the format expected by the OpenAI API
441
        openai_formatted_messages: list[dict[str, Any]] = []
1✔
442
        for message in messages:
1✔
443
            openai_formatted_messages.extend(_convert_chat_message_to_responses_api_format(message))
1✔
444

445
        tools = tools or self.tools
1✔
446
        tools_strict = tools_strict if tools_strict is not None else self.tools_strict
1✔
447

448
        openai_tools = {}
1✔
449
        # Build tool definitions
450
        if tools:
1✔
451
            tool_definitions: list[Any] = []
1✔
452
            if isinstance(tools, list) and isinstance(tools[0], dict):
1✔
453
                # Predefined OpenAI/MCP-style tools
454
                tool_definitions = tools
×
455

456
            # Convert all tool objects to the correct OpenAI-compatible structure
457
            else:
458
                # mypy can't infer that tools is ToolsType here
459
                flattened_tools = flatten_tools_or_toolsets(tools)  # type: ignore[arg-type]
1✔
460
                _check_duplicate_tool_names(flattened_tools)
1✔
461
                for t in flattened_tools:
×
462
                    function_spec = {**t.tool_spec}
×
463
                    if not tools_strict:
×
464
                        function_spec["strict"] = False
×
465
                    function_spec["parameters"]["additionalProperties"] = False
×
466
                    tool_definitions.append({"type": "function", **function_spec})
×
467

468
            openai_tools = {"tools": tool_definitions}
×
469

470
        base_args = {"model": self.model, "input": openai_formatted_messages, **openai_tools, **generation_kwargs}
1✔
471

472
        if text_format and issubclass(text_format, BaseModel):
1✔
473
            return {
×
474
                **base_args,
475
                "stream": streaming_callback is not None,
476
                "text_format": text_format,
477
                "openai_endpoint": "parse",
478
            }
479
        # we pass a key `openai_endpoint` as a hint to the run method to use the create or parse endpoint
480
        # this key will be removed before the API call is made
481

482
        return {**base_args, "stream": streaming_callback is not None, "openai_endpoint": "create"}
1✔
483

484
    def _handle_stream_response(self, responses: Stream, callback: SyncStreamingCallbackT) -> list[ChatMessage]:
1✔
485
        component_info = ComponentInfo.from_component(self)
×
486
        chunks: list[StreamingChunk] = []
×
487

488
        for openai_chunk in responses:  # pylint: disable=not-an-iterable
×
489
            chunk_delta = _convert_response_chunk_to_streaming_chunk(
×
490
                chunk=openai_chunk, previous_chunks=chunks, component_info=component_info
491
            )
492
            if chunk_delta:
×
493
                chunks.append(chunk_delta)
×
494
                callback(chunk_delta)
×
495
        chat_message = _convert_streaming_chunks_to_chat_message(chunks=chunks)
×
496
        return [chat_message]
×
497

498
    async def _handle_async_stream_response(
1✔
499
        self, responses: AsyncStream, callback: AsyncStreamingCallbackT
500
    ) -> list[ChatMessage]:
501
        component_info = ComponentInfo.from_component(self)
×
502
        chunks: list[StreamingChunk] = []
×
503
        async for openai_chunk in responses:  # pylint: disable=not-an-iterable
×
504
            chunk_delta = _convert_response_chunk_to_streaming_chunk(
×
505
                chunk=openai_chunk, previous_chunks=chunks, component_info=component_info
506
            )
507
            if chunk_delta:
×
508
                chunks.append(chunk_delta)
×
509
                await callback(chunk_delta)
×
510
        chat_message = _convert_streaming_chunks_to_chat_message(chunks=chunks)
×
511
        return [chat_message]
×
512

513

514
def _convert_response_to_chat_message(responses: Union[Response, ParsedResponse]) -> ChatMessage:
1✔
515
    """
516
    Converts the non-streaming response from the OpenAI API to a ChatMessage.
517

518
    :param responses: The responses returned by the OpenAI API.
519
    :returns: The ChatMessage.
520
    """
521

522
    tool_calls = []
×
523
    reasoning = None
×
524
    logprobs: list[dict] = []
×
525
    for output in responses.output:
×
526
        if isinstance(output, ResponseOutputRefusal):
×
527
            logger.warning("OpenAI returned a refusal output: {output}", output=output)
×
528
            continue
×
529

530
        if output.type == "message":
×
531
            for content in output.content:
×
532
                if hasattr(content, "logprobs") and content.logprobs is not None:
×
533
                    logprobs.append(_serialize_object(content.logprobs))
×
534

535
        if output.type == "reasoning":
×
536
            # openai doesn't return the reasoning tokens, but we can view summary if its enabled
537
            # https://platform.openai.com/docs/guides/reasoning#reasoning-summaries
538
            summaries = output.summary
×
539
            extra = output.to_dict()
×
540
            # we dont need the summary in the extra
541
            extra.pop("summary")
×
542
            reasoning_text = "\n".join([summary.text for summary in summaries if summaries])
×
543
            reasoning = ReasoningContent(reasoning_text=reasoning_text, extra=extra)
×
544

545
        elif output.type == "function_call":
×
546
            try:
×
547
                arguments = json.loads(output.arguments)
×
548
                tool_calls.append(
×
549
                    ToolCall(
550
                        id=output.id, tool_name=output.name, arguments=arguments, extra={"call_id": output.call_id}
551
                    )
552
                )
553
            except json.JSONDecodeError:
×
554
                logger.warning(
×
555
                    "The LLM provider returned a malformed JSON string for tool call arguments. This tool call "
556
                    "will be skipped. To always generate a valid JSON, set `tools_strict` to `True`. "
557
                    "Tool call ID: {_id}, Tool name: {_name}, Arguments: {_arguments}",
558
                    _id=output.id,
559
                    _name=output.name,
560
                    _arguments=output.arguments,
561
                )
562
                arguments = {}
×
563

564
    # we save the response as dict because it contains resp_id etc.
565
    meta = responses.to_dict()
×
566
    # remove output from meta because it contains toolcalls, reasoning, text etc.
567
    meta.pop("output")
×
568

569
    if logprobs:
×
570
        meta["logprobs"] = logprobs
×
571

572
    chat_message = ChatMessage.from_assistant(
×
573
        text=responses.output_text if responses.output_text else None,
574
        reasoning=reasoning,
575
        tool_calls=tool_calls,
576
        meta=meta,
577
    )
578

579
    return chat_message
×
580

581

582
def _convert_response_chunk_to_streaming_chunk(  # pylint: disable=too-many-return-statements
1✔
583
    chunk: ResponseStreamEvent, previous_chunks: list[StreamingChunk], component_info: Optional[ComponentInfo] = None
584
) -> StreamingChunk:
585
    """
586
    Converts the streaming response chunk from the OpenAI Responses API to a StreamingChunk.
587

588
    :param chunk: The chunk returned by the OpenAI Responses API.
589
    :param previous_chunks: A list of previously received StreamingChunks.
590
    :param component_info: An optional `ComponentInfo` object containing information about the component that
591
        generated the chunk, such as the component name and type.
592
    :returns:
593
        A StreamingChunk object representing the content of the chunk from the OpenAI Responses API.
594
    """
595
    if chunk.type == "response.output_item.added":
1✔
596
        # Responses API always returns reasoning chunks even if there is no summary
597
        if chunk.item.type == "reasoning":
1✔
598
            extra = chunk.item.to_dict()
1✔
599
            reasoning = ReasoningContent(reasoning_text="", extra=extra)
1✔
600
            return StreamingChunk(
1✔
601
                content="", component_info=component_info, index=chunk.output_index, reasoning=reasoning, start=True
602
            )
603

604
        # the function name is only streamed at the start and end of the function call
605
        if chunk.item.type == "function_call":
1✔
606
            function = chunk.item.name
1✔
607
            extra = {"type": "function_call", "call_id": chunk.item.call_id, "status": chunk.item.status}
1✔
608
            tool_call = ToolCallDelta(index=chunk.output_index, id=chunk.item.id, tool_name=function, extra=extra)
1✔
609
            return StreamingChunk(
1✔
610
                content="", component_info=component_info, index=chunk.output_index, tool_calls=[tool_call], start=True
611
            )
612

613
    elif chunk.type == "response.completed":
1✔
614
        # This means a full response is finished
615
        # If there are tool_calls present in the final output we mark finish_reason as tool_calls otherwise it's
616
        # marked as stop
617
        return StreamingChunk(
1✔
618
            content="",
619
            component_info=component_info,
620
            finish_reason="tool_calls" if any(o.type == "function_call" for o in chunk.response.output) else "stop",
621
            meta=chunk.to_dict(),
622
        )
623

624
    elif chunk.type == "response.output_text.delta":
1✔
625
        # if item is a ResponseTextDeltaEvent
626
        meta = chunk.to_dict()
1✔
627
        meta["received_at"] = datetime.now().isoformat()
1✔
628

629
        # Start is determined by checking if this is the first text delta event of a new output_index
630
        # 1) Check if all previous chunks have different output_index
631
        # 2) If any chunks do have the same output_index, check if they have content
632
        # If none of them have content, this is the start of a new text output
633
        start = all(c.index != chunk.output_index for c in previous_chunks) or all(
1✔
634
            c.content == "" for c in previous_chunks if c.index == chunk.output_index
635
        )
636
        return StreamingChunk(
1✔
637
            content=chunk.delta, component_info=component_info, index=chunk.output_index, meta=meta, start=start
638
        )
639

640
    elif chunk.type == "response.reasoning_summary_text.delta":
1✔
641
        # we remove the delta from the extra because it is already in the reasoning_text
642
        # rest of the information needs to be saved for chat message
643
        extra = chunk.to_dict()
×
644
        extra.pop("delta")
×
645
        reasoning = ReasoningContent(reasoning_text=chunk.delta, extra=extra)
×
646
        return StreamingChunk(
×
647
            content="", component_info=component_info, index=chunk.output_index, reasoning=reasoning, start=False
648
        )
649

650
    # the function arguments are streamed in parts
651
    # function name is not passed in these chunks
652
    elif chunk.type == "response.function_call_arguments.delta":
1✔
653
        arguments = chunk.delta
1✔
654
        extra = chunk.to_dict()
1✔
655
        extra.pop("delta")
1✔
656
        # in delta of tool calls there is no call_id
657
        # so we use the item_id which is the function call id
658
        tool_call = ToolCallDelta(index=chunk.output_index, id=chunk.item_id, arguments=arguments, extra=extra)
1✔
659
        return StreamingChunk(
1✔
660
            content="", component_info=component_info, index=chunk.output_index, tool_calls=[tool_call], start=False
661
        )
662

663
    # we return rest of the chunk as is
664
    chunk_message = StreamingChunk(
1✔
665
        content="", component_info=component_info, index=getattr(chunk, "output_index", None), meta=chunk.to_dict()
666
    )
667
    return chunk_message
1✔
668

669

670
def _convert_streaming_chunks_to_chat_message(chunks: list[StreamingChunk]) -> ChatMessage:
1✔
671
    """
672
    Connects the streaming chunks into a single ChatMessage.
673

674
    :param chunks: The list of all `StreamingChunk` objects.
675

676
    :returns: The ChatMessage.
677
    """
678
    reasoning = None
×
679
    tool_calls = []
×
680
    text = "".join([chunk.content for chunk in chunks])
×
681
    logprobs: list[dict] = []
×
682
    for chunk in chunks:
×
683
        logprobs_value = chunk.meta.get("logprobs")
×
684
        if logprobs_value is not None:
×
685
            logprobs.append(logprobs_value)
×
686

687
    # Process tool calls if present in any chunk
688
    tool_call_data: dict[str, dict[str, Any]] = {}  # Track tool calls by id
×
689
    for chunk in chunks:
×
690
        if chunk.tool_calls:
×
691
            for tool_call in chunk.tool_calls:
×
692
                # here the tool_call.id is fc_id not call_id
693
                assert tool_call.id is not None
×
694
                # We use the tool call id to track the tool call across chunks
695
                if tool_call.id not in tool_call_data:
×
696
                    tool_call_data[tool_call.id] = {"name": "", "arguments": ""}
×
697

698
                if tool_call.tool_name is not None:
×
699
                    # we dont need to append the tool name as it is passed once in the start of the function call
700
                    tool_call_data[tool_call.id]["name"] = tool_call.tool_name
×
701
                if tool_call.arguments is not None:
×
702
                    tool_call_data[tool_call.id]["arguments"] += tool_call.arguments
×
703
                if tool_call.extra is not None and tool_call.extra.get("type") == "function_call":
×
704
                    tool_call_data[tool_call.id]["extra"] = tool_call.extra
×
705

706
        if chunk.reasoning:
×
707
            reasoning = chunk.reasoning
×
708

709
    # Convert accumulated tool call data into ToolCall objects
710
    sorted_keys = sorted(tool_call_data.keys())
×
711
    for key in sorted_keys:
×
712
        tool_call_dict = tool_call_data[key]
×
713
        try:
×
714
            arguments = json.loads(tool_call_dict.get("arguments", "{}")) if tool_call_dict.get("arguments") else {}
×
715
            extra: dict[str, Any] = tool_call_dict.get("extra", {})
×
716
            tool_calls.append(ToolCall(id=key, tool_name=tool_call_dict["name"], arguments=arguments, extra=extra))
×
717
        except json.JSONDecodeError:
×
718
            logger.warning(
×
719
                "The LLM provider returned a malformed JSON string for tool call arguments. This tool call "
720
                "will be skipped. To always generate a valid JSON, set `tools_strict` to `True`. "
721
                "Tool call ID: {_id}, Tool name: {_name}, Arguments: {_arguments}",
722
                _id=key,
723
                _name=tool_call_dict["name"],
724
                _arguments=tool_call_dict["arguments"],
725
            )
726

727
    # the final response is the last chunk with the response metadata
728
    final_response = chunks[-1].meta.get("response")
×
729
    meta: dict[str, Any] = {
×
730
        "model": final_response.get("model") if final_response else None,
731
        "index": 0,
732
        "response_start_time": final_response.get("created_at") if final_response else None,
733
        "usage": final_response.get("usage") if final_response else None,
734
    }
735

736
    if logprobs:
×
737
        meta["logprobs"] = logprobs
×
738

739
    return ChatMessage.from_assistant(text=text or None, tool_calls=tool_calls, meta=meta, reasoning=reasoning)
×
740

741

742
def _convert_chat_message_to_responses_api_format(message: ChatMessage) -> list[dict[str, Any]]:
1✔
743
    """
744
    Convert a ChatMessage to the dictionary format expected by OpenAI's Responses API.
745

746
    :param message: The ChatMessage to convert to OpenAI's Responses API format.
747
    :returns:
748
        The ChatMessage in the format expected by OpenAI's Responses API.
749

750
    :raises ValueError:
751
        If the message format is invalid.
752
    """
753
    text_contents = message.texts
1✔
754
    tool_calls = message.tool_calls
1✔
755
    tool_call_results = message.tool_call_results
1✔
756
    images = message.images
1✔
757
    reasonings = message.reasonings
1✔
758

759
    if not text_contents and not tool_calls and not tool_call_results and not images and not reasonings:
1✔
760
        raise ValueError(
×
761
            """A `ChatMessage` must contain at least one `TextContent`, `ToolCall`, `ToolCallResult`,
762
              `ImageContent`, or `ReasoningContent`."""
763
        )
764
    if len(tool_call_results) > 0 and len(message._content) > 1:
1✔
765
        raise ValueError(
×
766
            "For OpenAI compatibility, a `ChatMessage` with a `ToolCallResult` cannot contain any other content."
767
        )
768

769
    formatted_messages: list[dict[str, Any]] = []
1✔
770
    openai_msg: dict[str, Any] = {"role": message._role.value}
1✔
771
    if message._name is not None:
1✔
772
        openai_msg["name"] = message._name
×
773

774
    # user message
775
    if message._role.value == "user":
1✔
776
        if len(message._content) == 1 and isinstance(message._content[0], TextContent):
1✔
777
            openai_msg["content"] = message.text
1✔
778
            return [openai_msg]
1✔
779

780
        # if the user message contains a list of text and images, OpenAI expects a list of dictionaries
781
        content = []
×
782
        for part in message._content:
×
783
            if isinstance(part, TextContent):
×
784
                text_type = "input_text"
×
785
                content.append({"type": text_type, "text": part.text})
×
786
            elif isinstance(part, ImageContent):
×
787
                image_item: dict[str, Any]
788
                image_item = {
×
789
                    "type": "input_image",
790
                    # If no MIME type is provided, default to JPEG.
791
                    # OpenAI API appears to tolerate MIME type mismatches.
792
                    "image_url": f"data:{part.mime_type or 'image/jpeg'};base64,{part.base64_image}",
793
                }
794

795
                content.append(image_item)
×
796

797
        openai_msg["content"] = content
×
798
        return [openai_msg]
×
799

800
    # tool message
801
    if tool_call_results:
1✔
802
        formatted_tool_results = []
×
803
        for result in tool_call_results:
×
804
            if result.origin.id is not None:
×
805
                tool_result = {
×
806
                    "type": "function_call_output",
807
                    "call_id": result.origin.extra.get("call_id") if result.origin.extra else "",
808
                    "output": result.result,
809
                }
810
                formatted_tool_results.append(tool_result)
×
811
        formatted_messages.extend(formatted_tool_results)
×
812

813
    # Note: the API expects a reasoning id even if there is no reasoning text
814
    # function calls without reasoning ids are not supported by the API
815
    if reasonings:
1✔
816
        formatted_reasonings = []
1✔
817
        for reasoning in reasonings:
1✔
818
            reasoning_item = {
1✔
819
                **(reasoning.extra),
820
                "summary": [{"text": reasoning.reasoning_text, "type": "summary_text"}],
821
            }
822
            formatted_reasonings.append(reasoning_item)
1✔
823
        formatted_messages.extend(formatted_reasonings)
1✔
824

825
    if tool_calls:
1✔
826
        formatted_tool_calls = []
1✔
827
        for tc in tool_calls:
1✔
828
            openai_tool_call = {
1✔
829
                "type": "function_call",
830
                # We disable ensure_ascii so special chars like emojis are not converted
831
                "name": tc.tool_name,
832
                "arguments": json.dumps(tc.arguments, ensure_ascii=False),
833
                "id": tc.id,
834
                "call_id": tc.extra.get("call_id") if tc.extra else "",
835
            }
836

837
            formatted_tool_calls.append(openai_tool_call)
1✔
838
        formatted_messages.extend(formatted_tool_calls)
1✔
839

840
    # system and assistant messages
841

842
    if text_contents:
1✔
843
        openai_msg["content"] = " ".join(text_contents)
×
844
        formatted_messages.append(openai_msg)
×
845

846
    return formatted_messages
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc