• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

deepset-ai / haystack / 19292414095

12 Nov 2025 09:19AM UTC coverage: 91.44% (+0.02%) from 91.42%
19292414095

push

github

web-flow
ci: disable Readme sync workflow on push events (#10058)

13716 of 15000 relevant lines covered (91.44%)

0.91 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

50.86
haystack/components/generators/chat/openai_responses.py
1
# SPDX-FileCopyrightText: 2022-present deepset GmbH <info@deepset.ai>
2
#
3
# SPDX-License-Identifier: Apache-2.0
4

5
import json
1✔
6
import os
1✔
7
from datetime import datetime
1✔
8
from typing import Any, Optional, Union
1✔
9

10
from openai import AsyncOpenAI, AsyncStream, OpenAI, Stream
1✔
11
from openai.lib._pydantic import to_strict_json_schema
1✔
12
from openai.types.responses import ParsedResponse, Response, ResponseOutputRefusal, ResponseStreamEvent
1✔
13
from pydantic import BaseModel
1✔
14

15
from haystack import component, default_from_dict, default_to_dict, logging
1✔
16
from haystack.components.generators.utils import _serialize_object
1✔
17
from haystack.dataclasses import (
1✔
18
    AsyncStreamingCallbackT,
19
    ChatMessage,
20
    ComponentInfo,
21
    ImageContent,
22
    ReasoningContent,
23
    StreamingCallbackT,
24
    StreamingChunk,
25
    SyncStreamingCallbackT,
26
    TextContent,
27
    ToolCall,
28
    ToolCallDelta,
29
    select_streaming_callback,
30
)
31
from haystack.tools import (
1✔
32
    ToolsType,
33
    _check_duplicate_tool_names,
34
    deserialize_tools_or_toolset_inplace,
35
    flatten_tools_or_toolsets,
36
    serialize_tools_or_toolset,
37
    warm_up_tools,
38
)
39
from haystack.utils import Secret, deserialize_callable, deserialize_secrets_inplace, serialize_callable
1✔
40
from haystack.utils.http_client import init_http_client
1✔
41

42
logger = logging.getLogger(__name__)
1✔
43

44

45
@component
1✔
46
class OpenAIResponsesChatGenerator:
1✔
47
    """
48
    Completes chats using OpenAI's Responses API.
49

50
    It works with the gpt-4 and o-series models and supports streaming responses
51
    from OpenAI API. It uses [ChatMessage](https://docs.haystack.deepset.ai/docs/chatmessage)
52
    format in input and output.
53

54
    You can customize how the text is generated by passing parameters to the
55
    OpenAI API. Use the `**generation_kwargs` argument when you initialize
56
    the component or when you run it. Any parameter that works with
57
    `openai.Responses.create` will work here too.
58

59
    For details on OpenAI API parameters, see
60
    [OpenAI documentation](https://platform.openai.com/docs/api-reference/responses).
61

62
    ### Usage example
63

64
    ```python
65
    from haystack.components.generators.chat import OpenAIResponsesChatGenerator
66
    from haystack.dataclasses import ChatMessage
67

68
    messages = [ChatMessage.from_user("What's Natural Language Processing?")]
69

70
    client = OpenAIResponsesChatGenerator(generation_kwargs={"reasoning": {"effort": "low", "summary": "auto"}})
71
    response = client.run(messages)
72
    print(response)
73
    ```
74
    """
75

76
    def __init__(
1✔
77
        self,
78
        *,
79
        api_key: Secret = Secret.from_env_var("OPENAI_API_KEY"),
80
        model: str = "gpt-5-mini",
81
        streaming_callback: Optional[StreamingCallbackT] = None,
82
        api_base_url: Optional[str] = None,
83
        organization: Optional[str] = None,
84
        generation_kwargs: Optional[dict[str, Any]] = None,
85
        timeout: Optional[float] = None,
86
        max_retries: Optional[int] = None,
87
        tools: Optional[Union[ToolsType, list[dict]]] = None,
88
        tools_strict: bool = False,
89
        http_client_kwargs: Optional[dict[str, Any]] = None,
90
    ):
91
        """
92
        Creates an instance of OpenAIResponsesChatGenerator. Uses OpenAI's gpt-5-mini by default.
93

94
        Before initializing the component, you can set the 'OPENAI_TIMEOUT' and 'OPENAI_MAX_RETRIES'
95
        environment variables to override the `timeout` and `max_retries` parameters respectively
96
        in the OpenAI client.
97

98
        :param api_key: The OpenAI API key.
99
            You can set it with an environment variable `OPENAI_API_KEY`, or pass with this parameter
100
            during initialization.
101
        :param model: The name of the model to use.
102
        :param streaming_callback: A callback function that is called when a new token is received from the stream.
103
            The callback function accepts [StreamingChunk](https://docs.haystack.deepset.ai/docs/data-classes#streamingchunk)
104
            as an argument.
105
        :param api_base_url: An optional base URL.
106
        :param organization: Your organization ID, defaults to `None`. See
107
        [production best practices](https://platform.openai.com/docs/guides/production-best-practices/setting-up-your-organization).
108
        :param generation_kwargs: Other parameters to use for the model. These parameters are sent
109
           directly to the OpenAI endpoint.
110
           See OpenAI [documentation](https://platform.openai.com/docs/api-reference/responses) for
111
            more details.
112
            Some of the supported parameters:
113
            - `temperature`: What sampling temperature to use. Higher values like 0.8 will make the output more random,
114
                while lower values like 0.2 will make it more focused and deterministic.
115
            - `top_p`: An alternative to sampling with temperature, called nucleus sampling, where the model
116
                considers the results of the tokens with top_p probability mass. For example, 0.1 means only the tokens
117
                comprising the top 10% probability mass are considered.
118
            - `previous_response_id`: The ID of the previous response.
119
                Use this to create multi-turn conversations.
120
            - `text_format`: A Pydantic model that enforces the structure of the model's response.
121
                If provided, the output will always be validated against this
122
                format (unless the model returns a tool call).
123
                For details, see the [OpenAI Structured Outputs documentation](https://platform.openai.com/docs/guides/structured-outputs).
124
            - `text`: A JSON schema that enforces the structure of the model's response.
125
                If provided, the output will always be validated against this
126
                format (unless the model returns a tool call).
127
                Notes:
128
                - Both JSON Schema and Pydantic models are supported for latest models starting from GPT-4o.
129
                - If both are provided, `text_format` takes precedence and json schema passed to `text` is ignored.
130
                - Currently, this component doesn't support streaming for structured outputs.
131
                - Older models only support basic version of structured outputs through `{"type": "json_object"}`.
132
                    For detailed information on JSON mode, see the [OpenAI Structured Outputs documentation](https://platform.openai.com/docs/guides/structured-outputs#json-mode).
133
            - `reasoning`: A dictionary of parameters for reasoning. For example:
134
                - `summary`: The summary of the reasoning.
135
                - `effort`: The level of effort to put into the reasoning. Can be `low`, `medium` or `high`.
136
                - `generate_summary`: Whether to generate a summary of the reasoning.
137
                Note: OpenAI does not return the reasoning tokens, but we can view summary if its enabled.
138
                For details, see the [OpenAI Reasoning documentation](https://platform.openai.com/docs/guides/reasoning).
139
        :param timeout:
140
            Timeout for OpenAI client calls. If not set, it defaults to either the
141
            `OPENAI_TIMEOUT` environment variable, or 30 seconds.
142
        :param max_retries:
143
            Maximum number of retries to contact OpenAI after an internal error.
144
            If not set, it defaults to either the `OPENAI_MAX_RETRIES` environment variable, or set to 5.
145
        :param tools:
146
            The tools that the model can use to prepare calls. This parameter can accept either a
147
            mixed list of Haystack `Tool` objects and Haystack `Toolset`. Or you can pass a dictionary of
148
            OpenAI/MCP tool definitions.
149
            Note: You cannot pass OpenAI/MCP tools and Haystack tools together.
150
            For details on tool support, see [OpenAI documentation](https://platform.openai.com/docs/api-reference/responses/create#responses-create-tools).
151
        :param tools_strict:
152
            Whether to enable strict schema adherence for tool calls. If set to `False`, the model may not exactly
153
            follow the schema provided in the `parameters` field of the tool definition. In Response API, tool calls
154
            are strict by default.
155
        :param http_client_kwargs:
156
            A dictionary of keyword arguments to configure a custom `httpx.Client`or `httpx.AsyncClient`.
157
            For more information, see the [HTTPX documentation](https://www.python-httpx.org/api/#client).
158

159
        """
160
        self.api_key = api_key
1✔
161
        self.model = model
1✔
162
        self.generation_kwargs = generation_kwargs or {}
1✔
163
        self.streaming_callback = streaming_callback
1✔
164
        self.api_base_url = api_base_url
1✔
165
        self.organization = organization
1✔
166
        self.timeout = timeout
1✔
167
        self.max_retries = max_retries
1✔
168
        self.tools = tools  # Store tools as-is, whether it's a list or a Toolset
1✔
169
        self.tools_strict = tools_strict
1✔
170
        self.http_client_kwargs = http_client_kwargs
1✔
171

172
        if timeout is None:
1✔
173
            timeout = float(os.environ.get("OPENAI_TIMEOUT", "30.0"))
1✔
174
        if max_retries is None:
1✔
175
            max_retries = int(os.environ.get("OPENAI_MAX_RETRIES", "5"))
1✔
176

177
        resolved_api_key = api_key.resolve_value() if isinstance(api_key, Secret) else api_key
1✔
178
        client_kwargs: dict[str, Any] = {
1✔
179
            "api_key": resolved_api_key,
180
            "organization": organization,
181
            "base_url": api_base_url,
182
            "timeout": timeout,
183
            "max_retries": max_retries,
184
        }
185

186
        self.client = OpenAI(http_client=init_http_client(self.http_client_kwargs, async_client=False), **client_kwargs)
1✔
187
        self.async_client = AsyncOpenAI(
1✔
188
            http_client=init_http_client(self.http_client_kwargs, async_client=True), **client_kwargs
189
        )
190
        self._is_warmed_up = False
1✔
191

192
    def warm_up(self):
1✔
193
        """
194
        Warm up the OpenAI responses chat generator.
195

196
        This will warm up the tools registered in the chat generator.
197
        This method is idempotent and will only warm up the tools once.
198
        """
199
        if not self._is_warmed_up:
1✔
200
            is_openai_tool = isinstance(self.tools, list) and isinstance(self.tools[0], dict)
1✔
201
            # We only warm up Haystack tools, not OpenAI/MCP tools
202
            # The type ignore is needed because mypy cannot infer the type correctly
203
            if not is_openai_tool:
1✔
204
                warm_up_tools(self.tools)  # type: ignore[arg-type]
1✔
205
            self._is_warmed_up = True
1✔
206

207
    def _get_telemetry_data(self) -> dict[str, Any]:
1✔
208
        """
209
        Data that is sent to Posthog for usage analytics.
210
        """
211
        return {"model": self.model}
×
212

213
    def to_dict(self) -> dict[str, Any]:
1✔
214
        """
215
        Serialize this component to a dictionary.
216

217
        :returns:
218
            The serialized component as a dictionary.
219
        """
220
        callback_name = serialize_callable(self.streaming_callback) if self.streaming_callback else None
1✔
221
        generation_kwargs = self.generation_kwargs.copy()
1✔
222
        text_format = generation_kwargs.pop("text_format", None)
1✔
223

224
        # If the response format is a Pydantic model, it's converted to openai's json schema format
225
        # If it's already a json schema, it's left as is
226
        if text_format and isinstance(text_format, type) and issubclass(text_format, BaseModel):
1✔
227
            json_schema = {
1✔
228
                "format": {
229
                    "type": "json_schema",
230
                    "name": text_format.__name__,
231
                    "strict": True,
232
                    "schema": to_strict_json_schema(text_format),
233
                }
234
            }
235
            # json schema needs to be passed to text parameter instead of text_format
236
            generation_kwargs["text"] = json_schema
1✔
237

238
        # OpenAI/MCP tools are passed as list of dictionaries
239
        serialized_tools: Union[dict[str, Any], list[dict[str, Any]], None]
240
        if self.tools and isinstance(self.tools, list) and isinstance(self.tools[0], dict):
1✔
241
            # mypy can't infer that self.tools is list[dict] here
242
            serialized_tools = self.tools  # type: ignore[assignment]
×
243
        else:
244
            serialized_tools = serialize_tools_or_toolset(self.tools)  # type: ignore[arg-type]
1✔
245

246
        return default_to_dict(
1✔
247
            self,
248
            model=self.model,
249
            streaming_callback=callback_name,
250
            api_base_url=self.api_base_url,
251
            organization=self.organization,
252
            generation_kwargs=generation_kwargs,
253
            api_key=self.api_key.to_dict(),
254
            timeout=self.timeout,
255
            max_retries=self.max_retries,
256
            tools=serialized_tools,
257
            tools_strict=self.tools_strict,
258
            http_client_kwargs=self.http_client_kwargs,
259
        )
260

261
    @classmethod
1✔
262
    def from_dict(cls, data: dict[str, Any]) -> "OpenAIResponsesChatGenerator":
1✔
263
        """
264
        Deserialize this component from a dictionary.
265

266
        :param data: The dictionary representation of this component.
267
        :returns:
268
            The deserialized component instance.
269
        """
270
        deserialize_secrets_inplace(data["init_parameters"], keys=["api_key"])
1✔
271

272
        # we only deserialize the tools if they are haystack tools
273
        # because openai tools are not serialized in the same way
274

275
        tools = data["init_parameters"].get("tools")
1✔
276
        if tools and (
1✔
277
            isinstance(tools, dict)
278
            and tools.get("type") == "haystack.tools.toolset.Toolset"
279
            or isinstance(tools, list)
280
            and tools[0].get("type") == "haystack.tools.tool.Tool"
281
        ):
282
            deserialize_tools_or_toolset_inplace(data["init_parameters"], key="tools")
1✔
283

284
        init_params = data.get("init_parameters", {})
1✔
285
        serialized_callback_handler = init_params.get("streaming_callback")
1✔
286

287
        if serialized_callback_handler:
1✔
288
            data["init_parameters"]["streaming_callback"] = deserialize_callable(serialized_callback_handler)
1✔
289
        return default_from_dict(cls, data)
1✔
290

291
    @component.output_types(replies=list[ChatMessage])
1✔
292
    def run(
1✔
293
        self,
294
        messages: list[ChatMessage],
295
        *,
296
        streaming_callback: Optional[StreamingCallbackT] = None,
297
        generation_kwargs: Optional[dict[str, Any]] = None,
298
        tools: Optional[Union[ToolsType, list[dict]]] = None,
299
        tools_strict: Optional[bool] = None,
300
    ):
301
        """
302
        Invokes response generation based on the provided messages and generation parameters.
303

304
        :param messages:
305
            A list of ChatMessage instances representing the input messages.
306
        :param streaming_callback:
307
            A callback function that is called when a new token is received from the stream.
308
        :param generation_kwargs:
309
            Additional keyword arguments for text generation. These parameters will
310
            override the parameters passed during component initialization.
311
            For details on OpenAI API parameters, see [OpenAI documentation](https://platform.openai.com/docs/api-reference/responses/create).
312
        :param tools:
313
            The tools that the model can use to prepare calls. If set, it will override the
314
            `tools` parameter set during component initialization. This parameter can accept either a
315
            mixed list of Haystack `Tool` objects and Haystack `Toolset`. Or you can pass a dictionary of
316
            OpenAI/MCP tool definitions.
317
            Note: You cannot pass OpenAI/MCP tools and Haystack tools together.
318
            For details on tool support, see [OpenAI documentation](https://platform.openai.com/docs/api-reference/responses/create#responses-create-tools).
319
        :param tools_strict:
320
            Whether to enable strict schema adherence for tool calls. If set to `False`, the model may not exactly
321
            follow the schema provided in the `parameters` field of the tool definition. In Response API, tool calls
322
            are strict by default.
323
            If set, it will override the `tools_strict` parameter set during component initialization.
324

325
        :returns:
326
            A dictionary with the following key:
327
            - `replies`: A list containing the generated responses as ChatMessage instances.
328
        """
329
        if len(messages) == 0:
1✔
330
            return {"replies": []}
×
331

332
        streaming_callback = select_streaming_callback(
1✔
333
            init_callback=self.streaming_callback, runtime_callback=streaming_callback, requires_async=False
334
        )
335
        responses: Union[Stream[ResponseStreamEvent], Response]
336

337
        api_args = self._prepare_api_call(
1✔
338
            messages=messages,
339
            streaming_callback=streaming_callback,
340
            generation_kwargs=generation_kwargs,
341
            tools=tools,
342
            tools_strict=tools_strict,
343
        )
344
        openai_endpoint = api_args.pop("openai_endpoint")
1✔
345
        openai_endpoint_method = getattr(self.client.responses, openai_endpoint)
1✔
346
        responses = openai_endpoint_method(**api_args)
1✔
347

348
        if streaming_callback is not None:
×
349
            response_output = self._handle_stream_response(
×
350
                responses,  # type: ignore
351
                streaming_callback,
352
            )
353
        else:
354
            assert isinstance(responses, Response), "Unexpected response type for non-streaming request."
×
355
            response_output = [_convert_response_to_chat_message(responses)]
×
356
        return {"replies": response_output}
×
357

358
    @component.output_types(replies=list[ChatMessage])
1✔
359
    async def run_async(
1✔
360
        self,
361
        messages: list[ChatMessage],
362
        *,
363
        streaming_callback: Optional[StreamingCallbackT] = None,
364
        generation_kwargs: Optional[dict[str, Any]] = None,
365
        tools: Optional[Union[ToolsType, list[dict]]] = None,
366
        tools_strict: Optional[bool] = None,
367
    ):
368
        """
369
        Asynchronously invokes response generation based on the provided messages and generation parameters.
370

371
        This is the asynchronous version of the `run` method. It has the same parameters and return values
372
        but can be used with `await` in async code.
373

374
        :param messages:
375
            A list of ChatMessage instances representing the input messages.
376
        :param streaming_callback:
377
            A callback function that is called when a new token is received from the stream.
378
            Must be a coroutine.
379
        :param generation_kwargs:
380
            Additional keyword arguments for text generation. These parameters will
381
            override the parameters passed during component initialization.
382
            For details on OpenAI API parameters, see [OpenAI documentation](https://platform.openai.com/docs/api-reference/responses/create).
383
        :param tools:
384
            A list of tools or a Toolset for which the model can prepare calls. If set, it will override the
385
            `tools` parameter set during component initialization. This parameter can accept either a list of
386
            mixed list of Haystack `Tool` objects and Haystack `Toolset`. Or you can pass a dictionary of
387
            OpenAI/MCP tool definitions.
388
            Note: You cannot pass OpenAI/MCP tools and Haystack tools together.
389
        :param tools_strict:
390
            Whether to enable strict schema adherence for tool calls. If set to `True`, the model will follow exactly
391
            the schema provided in the `parameters` field of the tool definition, but this may increase latency.
392
            If set, it will override the `tools_strict` parameter set during component initialization.
393

394
        :returns:
395
            A dictionary with the following key:
396
            - `replies`: A list containing the generated responses as ChatMessage instances.
397
        """
398
        # validate and select the streaming callback
399
        streaming_callback = select_streaming_callback(
×
400
            init_callback=self.streaming_callback, runtime_callback=streaming_callback, requires_async=True
401
        )
402
        responses: Union[AsyncStream[ResponseStreamEvent], Response]
403

404
        if len(messages) == 0:
×
405
            return {"replies": []}
×
406

407
        api_args = self._prepare_api_call(
×
408
            messages=messages,
409
            streaming_callback=streaming_callback,
410
            generation_kwargs=generation_kwargs,
411
            tools=tools,
412
            tools_strict=tools_strict,
413
        )
414

415
        openai_endpoint = api_args.pop("openai_endpoint")
×
416
        openai_endpoint_method = getattr(self.async_client.responses, openai_endpoint)
×
417
        responses = await openai_endpoint_method(**api_args)
×
418

419
        if streaming_callback is not None:
×
420
            response_output = await self._handle_async_stream_response(
×
421
                responses,  # type: ignore
422
                streaming_callback,
423
            )
424

425
        else:
426
            assert isinstance(responses, Response), "Unexpected response type for non-streaming request."
×
427
            response_output = [_convert_response_to_chat_message(responses)]
×
428
        return {"replies": response_output}
×
429

430
    def _prepare_api_call(  # noqa: PLR0913
1✔
431
        self,
432
        *,
433
        messages: list[ChatMessage],
434
        streaming_callback: Optional[StreamingCallbackT] = None,
435
        generation_kwargs: Optional[dict[str, Any]] = None,
436
        tools: Optional[Union[ToolsType, list[dict]]] = None,
437
        tools_strict: Optional[bool] = None,
438
    ) -> dict[str, Any]:
439
        # update generation kwargs by merging with the generation kwargs passed to the run method
440
        generation_kwargs = {**self.generation_kwargs, **(generation_kwargs or {})}
1✔
441

442
        # adapt ChatMessage(s) to the format expected by the OpenAI API
443
        openai_formatted_messages: list[dict[str, Any]] = []
1✔
444
        for message in messages:
1✔
445
            openai_formatted_messages.extend(_convert_chat_message_to_responses_api_format(message))
1✔
446

447
        tools = tools or self.tools
1✔
448
        tools_strict = tools_strict if tools_strict is not None else self.tools_strict
1✔
449

450
        openai_tools = {}
1✔
451
        # Build tool definitions
452
        if tools:
1✔
453
            tool_definitions: list[Any] = []
1✔
454
            if isinstance(tools, list) and isinstance(tools[0], dict):
1✔
455
                # Predefined OpenAI/MCP-style tools
456
                tool_definitions = tools
×
457

458
            # Convert all tool objects to the correct OpenAI-compatible structure
459
            else:
460
                # mypy can't infer that tools is ToolsType here
461
                flattened_tools = flatten_tools_or_toolsets(tools)  # type: ignore[arg-type]
1✔
462
                _check_duplicate_tool_names(flattened_tools)
1✔
463
                for t in flattened_tools:
×
464
                    function_spec = {**t.tool_spec}
×
465
                    if not tools_strict:
×
466
                        function_spec["strict"] = False
×
467
                    function_spec["parameters"]["additionalProperties"] = False
×
468
                    tool_definitions.append({"type": "function", **function_spec})
×
469

470
            openai_tools = {"tools": tool_definitions}
×
471

472
        base_args = {"model": self.model, "input": openai_formatted_messages, **openai_tools, **generation_kwargs}
1✔
473

474
        # if both `text_format` and `text` are provided, `text_format` takes precedence
475
        # and json schema passed to `text` is ignored
476
        if generation_kwargs.get("text_format") or generation_kwargs.get("text"):
1✔
477
            return {**base_args, "stream": streaming_callback is not None, "openai_endpoint": "parse"}
×
478
        # we pass a key `openai_endpoint` as a hint to the run method to use the create or parse endpoint
479
        # this key will be removed before the API call is made
480
        return {**base_args, "stream": streaming_callback is not None, "openai_endpoint": "create"}
1✔
481

482
    def _handle_stream_response(self, responses: Stream, callback: SyncStreamingCallbackT) -> list[ChatMessage]:
1✔
483
        component_info = ComponentInfo.from_component(self)
×
484
        chunks: list[StreamingChunk] = []
×
485

486
        for openai_chunk in responses:  # pylint: disable=not-an-iterable
×
487
            chunk_delta = _convert_response_chunk_to_streaming_chunk(
×
488
                chunk=openai_chunk, previous_chunks=chunks, component_info=component_info
489
            )
490
            chunks.append(chunk_delta)
×
491
            callback(chunk_delta)
×
492
        chat_message = _convert_streaming_chunks_to_chat_message(chunks=chunks)
×
493
        return [chat_message]
×
494

495
    async def _handle_async_stream_response(
1✔
496
        self, responses: AsyncStream, callback: AsyncStreamingCallbackT
497
    ) -> list[ChatMessage]:
498
        component_info = ComponentInfo.from_component(self)
×
499
        chunks: list[StreamingChunk] = []
×
500
        async for openai_chunk in responses:  # pylint: disable=not-an-iterable
×
501
            chunk_delta = _convert_response_chunk_to_streaming_chunk(
×
502
                chunk=openai_chunk, previous_chunks=chunks, component_info=component_info
503
            )
504
            chunks.append(chunk_delta)
×
505
            await callback(chunk_delta)
×
506
        chat_message = _convert_streaming_chunks_to_chat_message(chunks=chunks)
×
507
        return [chat_message]
×
508

509

510
def _convert_response_to_chat_message(responses: Union[Response, ParsedResponse]) -> ChatMessage:
1✔
511
    """
512
    Converts the non-streaming response from the OpenAI API to a ChatMessage.
513

514
    :param responses: The responses returned by the OpenAI API.
515
    :returns: The ChatMessage.
516
    """
517

518
    tool_calls = []
×
519
    reasoning = None
×
520
    logprobs: list[dict] = []
×
521
    for output in responses.output:
×
522
        if isinstance(output, ResponseOutputRefusal):
×
523
            logger.warning("OpenAI returned a refusal output: {output}", output=output)
×
524
            continue
×
525

526
        if output.type == "message":
×
527
            for content in output.content:
×
528
                if hasattr(content, "logprobs") and content.logprobs is not None:
×
529
                    logprobs.append(_serialize_object(content.logprobs))
×
530

531
        if output.type == "reasoning":
×
532
            # openai doesn't return the reasoning tokens, but we can view summary if its enabled
533
            # https://platform.openai.com/docs/guides/reasoning#reasoning-summaries
534
            summaries = output.summary
×
535
            extra = output.to_dict()
×
536
            # we dont need the summary in the extra
537
            extra.pop("summary")
×
538
            reasoning_text = "\n".join([summary.text for summary in summaries if summaries])
×
539
            reasoning = ReasoningContent(reasoning_text=reasoning_text, extra=extra)
×
540

541
        elif output.type == "function_call":
×
542
            try:
×
543
                arguments = json.loads(output.arguments)
×
544
                tool_calls.append(
×
545
                    ToolCall(
546
                        id=output.id, tool_name=output.name, arguments=arguments, extra={"call_id": output.call_id}
547
                    )
548
                )
549
            except json.JSONDecodeError:
×
550
                logger.warning(
×
551
                    "The LLM provider returned a malformed JSON string for tool call arguments. This tool call "
552
                    "will be skipped. To always generate a valid JSON, set `tools_strict` to `True`. "
553
                    "Tool call ID: {_id}, Tool name: {_name}, Arguments: {_arguments}",
554
                    _id=output.id,
555
                    _name=output.name,
556
                    _arguments=output.arguments,
557
                )
558
                arguments = {}
×
559

560
    # we save the response as dict because it contains resp_id etc.
561
    meta = responses.to_dict()
×
562

563
    # remove output from meta because it contains toolcalls, reasoning, text etc.
564
    meta.pop("output")
×
565

566
    if logprobs:
×
567
        meta["logprobs"] = logprobs
×
568

569
    chat_message = ChatMessage.from_assistant(
×
570
        text=responses.output_text if responses.output_text else None,
571
        reasoning=reasoning,
572
        tool_calls=tool_calls,
573
        meta=meta,
574
    )
575

576
    return chat_message
×
577

578

579
def _convert_response_chunk_to_streaming_chunk(  # pylint: disable=too-many-return-statements
1✔
580
    chunk: ResponseStreamEvent, previous_chunks: list[StreamingChunk], component_info: Optional[ComponentInfo] = None
581
) -> StreamingChunk:
582
    """
583
    Converts the streaming response chunk from the OpenAI Responses API to a StreamingChunk.
584

585
    :param chunk: The chunk returned by the OpenAI Responses API.
586
    :param previous_chunks: A list of previously received StreamingChunks.
587
    :param component_info: An optional `ComponentInfo` object containing information about the component that
588
        generated the chunk, such as the component name and type.
589
    :returns:
590
        A StreamingChunk object representing the content of the chunk from the OpenAI Responses API.
591
    """
592
    if chunk.type == "response.output_item.added":
1✔
593
        # Responses API always returns reasoning chunks even if there is no summary
594
        if chunk.item.type == "reasoning":
1✔
595
            reasoning = ReasoningContent(reasoning_text="", extra=chunk.item.to_dict())
1✔
596
            return StreamingChunk(
1✔
597
                content="",
598
                component_info=component_info,
599
                index=chunk.output_index,
600
                reasoning=reasoning,
601
                start=True,
602
                meta={"received_at": datetime.now().isoformat()},
603
            )
604

605
        # the function name is only streamed at the start and end of the function call
606
        if chunk.item.type == "function_call":
1✔
607
            tool_call = ToolCallDelta(
1✔
608
                index=chunk.output_index, id=chunk.item.id, tool_name=chunk.item.name, extra=chunk.item.to_dict()
609
            )
610
            return StreamingChunk(
1✔
611
                content="",
612
                component_info=component_info,
613
                index=chunk.output_index,
614
                tool_calls=[tool_call],
615
                start=True,
616
                meta={"received_at": datetime.now().isoformat()},
617
            )
618

619
    elif chunk.type == "response.completed":
1✔
620
        # This means a full response is finished
621
        # If there are tool_calls present in the final output we mark finish_reason as tool_calls otherwise it's
622
        # marked as stop
623
        return StreamingChunk(
1✔
624
            content="",
625
            component_info=component_info,
626
            finish_reason="tool_calls" if any(o.type == "function_call" for o in chunk.response.output) else "stop",
627
            meta={**chunk.to_dict(), "received_at": datetime.now().isoformat()},
628
        )
629

630
    elif chunk.type == "response.output_text.delta":
1✔
631
        # Start is determined by checking if this is the first text delta event of a new output_index
632
        # 1) Check if all previous chunks have different output_index
633
        # 2) If any chunks do have the same output_index, check if they have content
634
        # If none of them have content, this is the start of a new text output
635
        start = all(c.index != chunk.output_index for c in previous_chunks) or all(
1✔
636
            c.content == "" for c in previous_chunks if c.index == chunk.output_index
637
        )
638
        return StreamingChunk(
1✔
639
            content=chunk.delta,
640
            component_info=component_info,
641
            index=chunk.output_index,
642
            start=start,
643
            meta={**chunk.to_dict(), "received_at": datetime.now().isoformat()},
644
        )
645

646
    elif chunk.type == "response.reasoning_summary_text.delta":
1✔
647
        # We remove the delta from the extra because it is already in the reasoning_text
648
        # Remaining information needs to be saved for chat message
649
        extra = chunk.to_dict()
×
650
        extra.pop("delta")
×
651
        reasoning = ReasoningContent(reasoning_text=chunk.delta, extra=extra)
×
652
        return StreamingChunk(
×
653
            content="",
654
            component_info=component_info,
655
            index=chunk.output_index,
656
            reasoning=reasoning,
657
            meta={"received_at": datetime.now().isoformat()},
658
        )
659

660
    # the function arguments are streamed in parts
661
    # function name is not passed in these chunks
662
    elif chunk.type == "response.function_call_arguments.delta":
1✔
663
        arguments = chunk.delta
1✔
664
        extra = chunk.to_dict()
1✔
665
        extra.pop("delta")
1✔
666
        # in delta of tool calls there is no call_id so we use the item_id which is the function call id
667
        tool_call = ToolCallDelta(index=chunk.output_index, id=chunk.item_id, arguments=arguments, extra=extra)
1✔
668
        return StreamingChunk(
1✔
669
            content="",
670
            component_info=component_info,
671
            index=chunk.output_index,
672
            tool_calls=[tool_call],
673
            meta={"received_at": datetime.now().isoformat()},
674
        )
675

676
    # we return rest of the chunk as is
677
    chunk_message = StreamingChunk(
1✔
678
        content="",
679
        component_info=component_info,
680
        index=getattr(chunk, "output_index", None),
681
        meta={**chunk.to_dict(), "received_at": datetime.now().isoformat()},
682
    )
683
    return chunk_message
1✔
684

685

686
def _convert_streaming_chunks_to_chat_message(chunks: list[StreamingChunk]) -> ChatMessage:
1✔
687
    """
688
    Connects the streaming chunks into a single ChatMessage.
689

690
    :param chunks: The list of all `StreamingChunk` objects.
691

692
    :returns: The ChatMessage.
693
    """
694

695
    # Get the full text by concatenating all text chunks
696
    text = "".join([chunk.content for chunk in chunks])
×
697
    logprobs = []
×
698
    for chunk in chunks:
×
699
        if chunk.meta.get("logprobs"):
×
700
            logprobs.append(chunk.meta.get("logprobs"))
×
701

702
    # Gather reasoning information if present
703
    reasoning_id = None
×
704
    reasoning_text = ""
×
705
    for chunk in chunks:
×
706
        if chunk.reasoning:
×
707
            reasoning_text += chunk.reasoning.reasoning_text
×
708
            if chunk.reasoning.extra.get("id"):
×
709
                reasoning_id = chunk.reasoning.extra.get("id")
×
710

711
    # Process tool calls if present in any chunk
712
    tool_call_data: dict[str, dict[str, Any]] = {}  # Track tool calls by id
×
713
    for chunk in chunks:
×
714
        if chunk.tool_calls:
×
715
            for tool_call in chunk.tool_calls:
×
716
                # here the tool_call.id is fc_id not call_id
717
                assert tool_call.id is not None
×
718
                # We use the tool call id to track the tool call across chunks
719
                if tool_call.id not in tool_call_data:
×
720
                    tool_call_data[tool_call.id] = {"name": "", "arguments": ""}
×
721

722
                if tool_call.arguments is not None:
×
723
                    tool_call_data[tool_call.id]["arguments"] += tool_call.arguments
×
724

725
                # We capture the tool name from one of the chunks
726
                if tool_call.tool_name is not None:
×
727
                    tool_call_data[tool_call.id]["name"] = tool_call.tool_name
×
728

729
                # We capture the call_id from one of the chunks
730
                if tool_call.extra and "call_id" in tool_call.extra:
×
731
                    tool_call_data[tool_call.id]["extra"] = {"call_id": tool_call.extra["call_id"]}
×
732

733
    # Convert accumulated tool call data into ToolCall objects
734
    tool_calls = []
×
735
    sorted_keys = sorted(tool_call_data.keys())
×
736
    for key in sorted_keys:
×
737
        tool_call_dict = tool_call_data[key]
×
738
        try:
×
739
            arguments = json.loads(tool_call_dict.get("arguments", "{}")) if tool_call_dict.get("arguments") else {}
×
740
            extra: dict[str, Any] = tool_call_dict.get("extra", {})
×
741
            tool_calls.append(ToolCall(id=key, tool_name=tool_call_dict["name"], arguments=arguments, extra=extra))
×
742
        except json.JSONDecodeError:
×
743
            logger.warning(
×
744
                "The LLM provider returned a malformed JSON string for tool call arguments. This tool call "
745
                "will be skipped. To always generate a valid JSON, set `tools_strict` to `True`. "
746
                "Tool call ID: {_id}, Tool name: {_name}, Arguments: {_arguments}",
747
                _id=key,
748
                _name=tool_call_dict["name"],
749
                _arguments=tool_call_dict["arguments"],
750
            )
751

752
    # We dump the entire final response into meta to be consistent with non-streaming response
753
    final_response = chunks[-1].meta.get("response") or {}
×
754
    final_response.pop("output", None)
×
755
    if logprobs:
×
756
        final_response["logprobs"] = logprobs
×
757

758
    # Add reasoning content if both id and text are available
759
    reasoning = None
×
760
    if reasoning_id and reasoning_text:
×
761
        reasoning = ReasoningContent(reasoning_text=reasoning_text, extra={"id": reasoning_id, "type": "reasoning"})
×
762

763
    return ChatMessage.from_assistant(
×
764
        text=text or None, tool_calls=tool_calls, meta=final_response, reasoning=reasoning
765
    )
766

767

768
def _convert_chat_message_to_responses_api_format(message: ChatMessage) -> list[dict[str, Any]]:
1✔
769
    """
770
    Convert a ChatMessage to the dictionary format expected by OpenAI's Responses API.
771

772
    :param message: The ChatMessage to convert to OpenAI's Responses API format.
773
    :returns:
774
        The ChatMessage in the format expected by OpenAI's Responses API.
775

776
    :raises ValueError:
777
        If the message format is invalid.
778
    """
779
    text_contents = message.texts
1✔
780
    tool_calls = message.tool_calls
1✔
781
    tool_call_results = message.tool_call_results
1✔
782
    images = message.images
1✔
783
    reasonings = message.reasonings
1✔
784

785
    if not text_contents and not tool_calls and not tool_call_results and not images and not reasonings:
1✔
786
        raise ValueError(
×
787
            """A `ChatMessage` must contain at least one `TextContent`, `ToolCall`, `ToolCallResult`,
788
              `ImageContent`, or `ReasoningContent`."""
789
        )
790
    if len(tool_call_results) > 0 and len(message._content) > 1:
1✔
791
        raise ValueError(
×
792
            "For OpenAI compatibility, a `ChatMessage` with a `ToolCallResult` cannot contain any other content."
793
        )
794

795
    formatted_messages: list[dict[str, Any]] = []
1✔
796
    openai_msg: dict[str, Any] = {"role": message._role.value}
1✔
797
    if message._name is not None:
1✔
798
        openai_msg["name"] = message._name
×
799

800
    # user message
801
    if message._role.value == "user":
1✔
802
        if len(message._content) == 1 and isinstance(message._content[0], TextContent):
1✔
803
            openai_msg["content"] = message.text
1✔
804
            return [openai_msg]
1✔
805

806
        # if the user message contains a list of text and images, OpenAI expects a list of dictionaries
807
        content = []
×
808
        for part in message._content:
×
809
            if isinstance(part, TextContent):
×
810
                text_type = "input_text"
×
811
                content.append({"type": text_type, "text": part.text})
×
812
            elif isinstance(part, ImageContent):
×
813
                image_item: dict[str, Any]
814
                image_item = {
×
815
                    "type": "input_image",
816
                    # If no MIME type is provided, default to JPEG.
817
                    # OpenAI API appears to tolerate MIME type mismatches.
818
                    "image_url": f"data:{part.mime_type or 'image/jpeg'};base64,{part.base64_image}",
819
                }
820

821
                content.append(image_item)
×
822

823
        openai_msg["content"] = content
×
824
        return [openai_msg]
×
825

826
    # tool message
827
    if tool_call_results:
1✔
828
        formatted_tool_results = []
×
829
        for result in tool_call_results:
×
830
            if result.origin.id is not None:
×
831
                tool_result = {
×
832
                    "type": "function_call_output",
833
                    "call_id": result.origin.extra.get("call_id") if result.origin.extra else "",
834
                    "output": result.result,
835
                }
836
                formatted_tool_results.append(tool_result)
×
837
        formatted_messages.extend(formatted_tool_results)
×
838

839
    # Note: the API expects a reasoning id even if there is no reasoning text
840
    # function calls without reasoning ids are not supported by the API
841
    if reasonings:
1✔
842
        formatted_reasonings = []
1✔
843
        for reasoning in reasonings:
1✔
844
            reasoning_item = {
1✔
845
                **(reasoning.extra),
846
                "summary": [{"text": reasoning.reasoning_text, "type": "summary_text"}],
847
            }
848
            formatted_reasonings.append(reasoning_item)
1✔
849
        formatted_messages.extend(formatted_reasonings)
1✔
850

851
    if tool_calls:
1✔
852
        formatted_tool_calls = []
1✔
853
        for tc in tool_calls:
1✔
854
            openai_tool_call = {
1✔
855
                "type": "function_call",
856
                # We disable ensure_ascii so special chars like emojis are not converted
857
                "name": tc.tool_name,
858
                "arguments": json.dumps(tc.arguments, ensure_ascii=False),
859
                "id": tc.id,
860
                "call_id": tc.extra.get("call_id") if tc.extra else "",
861
            }
862

863
            formatted_tool_calls.append(openai_tool_call)
1✔
864
        formatted_messages.extend(formatted_tool_calls)
1✔
865

866
    # system and assistant messages
867

868
    if text_contents:
1✔
869
        openai_msg["content"] = " ".join(text_contents)
×
870
        formatted_messages.append(openai_msg)
×
871

872
    return formatted_messages
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc