• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

deepset-ai / haystack / 18530018322

15 Oct 2025 01:10PM UTC coverage: 92.075% (-0.03%) from 92.103%
18530018322

Pull #9880

github

web-flow
Merge 6dad544fe into cfa5d2761
Pull Request #9880: draft: Expand tools param to include list[Toolset]

13279 of 14422 relevant lines covered (92.07%)

0.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.27
haystack/components/generators/chat/openai.py
1
# SPDX-FileCopyrightText: 2022-present deepset GmbH <info@deepset.ai>
2
#
3
# SPDX-License-Identifier: Apache-2.0
4

5
import json
1✔
6
import os
1✔
7
from datetime import datetime
1✔
8
from typing import Any, Optional, Union
1✔
9

10
from openai import AsyncOpenAI, AsyncStream, OpenAI, Stream
1✔
11
from openai.lib._pydantic import to_strict_json_schema
1✔
12
from openai.types.chat import (
1✔
13
    ChatCompletion,
14
    ChatCompletionChunk,
15
    ChatCompletionMessage,
16
    ChatCompletionMessageCustomToolCall,
17
    ParsedChatCompletion,
18
    ParsedChatCompletionMessage,
19
)
20
from openai.types.chat.chat_completion import Choice
1✔
21
from openai.types.chat.chat_completion_chunk import Choice as ChunkChoice
1✔
22
from pydantic import BaseModel
1✔
23

24
from haystack import component, default_from_dict, default_to_dict, logging
1✔
25
from haystack.components.generators.utils import _convert_streaming_chunks_to_chat_message
1✔
26
from haystack.dataclasses import (
1✔
27
    AsyncStreamingCallbackT,
28
    ChatMessage,
29
    ComponentInfo,
30
    FinishReason,
31
    StreamingCallbackT,
32
    StreamingChunk,
33
    SyncStreamingCallbackT,
34
    ToolCall,
35
    ToolCallDelta,
36
    select_streaming_callback,
37
)
38
from haystack.tools import (
1✔
39
    Tool,
40
    Toolset,
41
    _check_duplicate_tool_names,
42
    deserialize_tools_or_toolset_inplace,
43
    flatten_tools_or_toolsets,
44
    serialize_tools_or_toolset,
45
)
46
from haystack.utils import Secret, deserialize_callable, deserialize_secrets_inplace, serialize_callable
1✔
47
from haystack.utils.http_client import init_http_client
1✔
48

49
logger = logging.getLogger(__name__)
1✔
50

51

52
@component
1✔
53
class OpenAIChatGenerator:
1✔
54
    """
55
    Completes chats using OpenAI's large language models (LLMs).
56

57
    It works with the gpt-4 and o-series models and supports streaming responses
58
    from OpenAI API. It uses [ChatMessage](https://docs.haystack.deepset.ai/docs/chatmessage)
59
    format in input and output.
60

61
    You can customize how the text is generated by passing parameters to the
62
    OpenAI API. Use the `**generation_kwargs` argument when you initialize
63
    the component or when you run it. Any parameter that works with
64
    `openai.ChatCompletion.create` will work here too.
65

66
    For details on OpenAI API parameters, see
67
    [OpenAI documentation](https://platform.openai.com/docs/api-reference/chat).
68

69
    ### Usage example
70

71
    ```python
72
    from haystack.components.generators.chat import OpenAIChatGenerator
73
    from haystack.dataclasses import ChatMessage
74

75
    messages = [ChatMessage.from_user("What's Natural Language Processing?")]
76

77
    client = OpenAIChatGenerator()
78
    response = client.run(messages)
79
    print(response)
80
    ```
81
    Output:
82
    ```
83
    {'replies':
84
        [ChatMessage(_role=<ChatRole.ASSISTANT: 'assistant'>, _content=
85
        [TextContent(text="Natural Language Processing (NLP) is a branch of artificial intelligence
86
            that focuses on enabling computers to understand, interpret, and generate human language in
87
            a way that is meaningful and useful.")],
88
         _name=None,
89
         _meta={'model': 'gpt-4o-mini', 'index': 0, 'finish_reason': 'stop',
90
         'usage': {'prompt_tokens': 15, 'completion_tokens': 36, 'total_tokens': 51}})
91
        ]
92
    }
93
    ```
94
    """
95

96
    def __init__(  # pylint: disable=too-many-positional-arguments
1✔
97
        self,
98
        api_key: Secret = Secret.from_env_var("OPENAI_API_KEY"),
99
        model: str = "gpt-4o-mini",
100
        streaming_callback: Optional[StreamingCallbackT] = None,
101
        api_base_url: Optional[str] = None,
102
        organization: Optional[str] = None,
103
        generation_kwargs: Optional[dict[str, Any]] = None,
104
        timeout: Optional[float] = None,
105
        max_retries: Optional[int] = None,
106
        tools: Optional[Union[list[Tool], Toolset, list[Toolset]]] = None,
107
        tools_strict: bool = False,
108
        http_client_kwargs: Optional[dict[str, Any]] = None,
109
    ):
110
        """
111
        Creates an instance of OpenAIChatGenerator. Unless specified otherwise in `model`, uses OpenAI's gpt-4o-mini
112

113
        Before initializing the component, you can set the 'OPENAI_TIMEOUT' and 'OPENAI_MAX_RETRIES'
114
        environment variables to override the `timeout` and `max_retries` parameters respectively
115
        in the OpenAI client.
116

117
        :param api_key: The OpenAI API key.
118
            You can set it with an environment variable `OPENAI_API_KEY`, or pass with this parameter
119
            during initialization.
120
        :param model: The name of the model to use.
121
        :param streaming_callback: A callback function that is called when a new token is received from the stream.
122
            The callback function accepts [StreamingChunk](https://docs.haystack.deepset.ai/docs/data-classes#streamingchunk)
123
            as an argument.
124
        :param api_base_url: An optional base URL.
125
        :param organization: Your organization ID, defaults to `None`. See
126
        [production best practices](https://platform.openai.com/docs/guides/production-best-practices/setting-up-your-organization).
127
        :param generation_kwargs: Other parameters to use for the model. These parameters are sent directly to
128
            the OpenAI endpoint. See OpenAI [documentation](https://platform.openai.com/docs/api-reference/chat) for
129
            more details.
130
            Some of the supported parameters:
131
            - `max_completion_tokens`: An upper bound for the number of tokens that can be generated for a completion,
132
                including visible output tokens and reasoning tokens.
133
            - `temperature`: What sampling temperature to use. Higher values mean the model will take more risks.
134
                Try 0.9 for more creative applications and 0 (argmax sampling) for ones with a well-defined answer.
135
            - `top_p`: An alternative to sampling with temperature, called nucleus sampling, where the model
136
                considers the results of the tokens with top_p probability mass. For example, 0.1 means only the tokens
137
                comprising the top 10% probability mass are considered.
138
            - `n`: How many completions to generate for each prompt. For example, if the LLM gets 3 prompts and n is 2,
139
                it will generate two completions for each of the three prompts, ending up with 6 completions in total.
140
            - `stop`: One or more sequences after which the LLM should stop generating tokens.
141
            - `presence_penalty`: What penalty to apply if a token is already present at all. Bigger values mean
142
                the model will be less likely to repeat the same token in the text.
143
            - `frequency_penalty`: What penalty to apply if a token has already been generated in the text.
144
                Bigger values mean the model will be less likely to repeat the same token in the text.
145
            - `logit_bias`: Add a logit bias to specific tokens. The keys of the dictionary are tokens, and the
146
                values are the bias to add to that token.
147
            - `response_format`: A JSON schema or a Pydantic model that enforces the structure of the model's response.
148
                If provided, the output will always be validated against this
149
                format (unless the model returns a tool call).
150
                For details, see the [OpenAI Structured Outputs documentation](https://platform.openai.com/docs/guides/structured-outputs).
151
                Notes:
152
                - This parameter accepts Pydantic models and JSON schemas for latest models starting from GPT-4o.
153
                  Older models only support basic version of structured outputs through `{"type": "json_object"}`.
154
                  For detailed information on JSON mode, see the [OpenAI Structured Outputs documentation](https://platform.openai.com/docs/guides/structured-outputs#json-mode).
155
                - For structured outputs with streaming,
156
                  the `response_format` must be a JSON schema and not a Pydantic model.
157
        :param timeout:
158
            Timeout for OpenAI client calls. If not set, it defaults to either the
159
            `OPENAI_TIMEOUT` environment variable, or 30 seconds.
160
        :param max_retries:
161
            Maximum number of retries to contact OpenAI after an internal error.
162
            If not set, it defaults to either the `OPENAI_MAX_RETRIES` environment variable, or set to 5.
163
        :param tools:
164
            A list of tools, a Toolset, or a list of Toolset instances for which the model can prepare calls.
165
        :param tools_strict:
166
            Whether to enable strict schema adherence for tool calls. If set to `True`, the model will follow exactly
167
            the schema provided in the `parameters` field of the tool definition, but this may increase latency.
168
        :param http_client_kwargs:
169
            A dictionary of keyword arguments to configure a custom `httpx.Client`or `httpx.AsyncClient`.
170
            For more information, see the [HTTPX documentation](https://www.python-httpx.org/api/#client).
171

172
        """
173
        self.api_key = api_key
1✔
174
        self.model = model
1✔
175
        self.generation_kwargs = generation_kwargs or {}
1✔
176
        self.streaming_callback = streaming_callback
1✔
177
        self.api_base_url = api_base_url
1✔
178
        self.organization = organization
1✔
179
        self.timeout = timeout
1✔
180
        self.max_retries = max_retries
1✔
181
        self.tools = tools  # Store tools as-is, whether it's a list or a Toolset
1✔
182
        self.tools_strict = tools_strict
1✔
183
        self.http_client_kwargs = http_client_kwargs
1✔
184
        # Check for duplicate tool names
185
        _check_duplicate_tool_names(flatten_tools_or_toolsets(self.tools))
1✔
186

187
        if timeout is None:
1✔
188
            timeout = float(os.environ.get("OPENAI_TIMEOUT", "30.0"))
1✔
189
        if max_retries is None:
1✔
190
            max_retries = int(os.environ.get("OPENAI_MAX_RETRIES", "5"))
1✔
191

192
        client_kwargs: dict[str, Any] = {
1✔
193
            "api_key": api_key.resolve_value(),
194
            "organization": organization,
195
            "base_url": api_base_url,
196
            "timeout": timeout,
197
            "max_retries": max_retries,
198
        }
199

200
        self.client = OpenAI(http_client=init_http_client(self.http_client_kwargs, async_client=False), **client_kwargs)
1✔
201
        self.async_client = AsyncOpenAI(
1✔
202
            http_client=init_http_client(self.http_client_kwargs, async_client=True), **client_kwargs
203
        )
204

205
    def _get_telemetry_data(self) -> dict[str, Any]:
1✔
206
        """
207
        Data that is sent to Posthog for usage analytics.
208
        """
209
        return {"model": self.model}
1✔
210

211
    def to_dict(self) -> dict[str, Any]:
1✔
212
        """
213
        Serialize this component to a dictionary.
214

215
        :returns:
216
            The serialized component as a dictionary.
217
        """
218
        callback_name = serialize_callable(self.streaming_callback) if self.streaming_callback else None
1✔
219
        generation_kwargs = self.generation_kwargs.copy()
1✔
220
        response_format = generation_kwargs.get("response_format")
1✔
221

222
        # If the response format is a Pydantic model, it's converted to openai's json schema format
223
        # If it's already a json schema, it's left as is
224
        if response_format and isinstance(response_format, type) and issubclass(response_format, BaseModel):
1✔
225
            json_schema = {
1✔
226
                "type": "json_schema",
227
                "json_schema": {
228
                    "name": response_format.__name__,
229
                    "strict": True,
230
                    "schema": to_strict_json_schema(response_format),
231
                },
232
            }
233
            generation_kwargs["response_format"] = json_schema
1✔
234

235
        return default_to_dict(
1✔
236
            self,
237
            model=self.model,
238
            streaming_callback=callback_name,
239
            api_base_url=self.api_base_url,
240
            organization=self.organization,
241
            generation_kwargs=generation_kwargs,
242
            api_key=self.api_key.to_dict(),
243
            timeout=self.timeout,
244
            max_retries=self.max_retries,
245
            tools=serialize_tools_or_toolset(self.tools),
246
            tools_strict=self.tools_strict,
247
            http_client_kwargs=self.http_client_kwargs,
248
        )
249

250
    @classmethod
1✔
251
    def from_dict(cls, data: dict[str, Any]) -> "OpenAIChatGenerator":
1✔
252
        """
253
        Deserialize this component from a dictionary.
254

255
        :param data: The dictionary representation of this component.
256
        :returns:
257
            The deserialized component instance.
258
        """
259
        deserialize_secrets_inplace(data["init_parameters"], keys=["api_key"])
1✔
260
        deserialize_tools_or_toolset_inplace(data["init_parameters"], key="tools")
1✔
261
        init_params = data.get("init_parameters", {})
1✔
262
        serialized_callback_handler = init_params.get("streaming_callback")
1✔
263

264
        if serialized_callback_handler:
1✔
265
            data["init_parameters"]["streaming_callback"] = deserialize_callable(serialized_callback_handler)
1✔
266
        return default_from_dict(cls, data)
1✔
267

268
    @component.output_types(replies=list[ChatMessage])
1✔
269
    def run(
1✔
270
        self,
271
        messages: list[ChatMessage],
272
        streaming_callback: Optional[StreamingCallbackT] = None,
273
        generation_kwargs: Optional[dict[str, Any]] = None,
274
        *,
275
        tools: Optional[Union[list[Tool], Toolset, list[Toolset]]] = None,
276
        tools_strict: Optional[bool] = None,
277
    ):
278
        """
279
        Invokes chat completion based on the provided messages and generation parameters.
280

281
        :param messages:
282
            A list of ChatMessage instances representing the input messages.
283
        :param streaming_callback:
284
            A callback function that is called when a new token is received from the stream.
285
        :param generation_kwargs:
286
            Additional keyword arguments for text generation. These parameters will
287
            override the parameters passed during component initialization.
288
            For details on OpenAI API parameters, see [OpenAI documentation](https://platform.openai.com/docs/api-reference/chat/create).
289
        :param tools:
290
            A list of tools or a Toolset for which the model can prepare calls. If set, it will override the
291
            `tools` parameter set during component initialization. This parameter can accept either a list of
292
            `Tool` objects or a `Toolset` instance.
293
        :param tools_strict:
294
            Whether to enable strict schema adherence for tool calls. If set to `True`, the model will follow exactly
295
            the schema provided in the `parameters` field of the tool definition, but this may increase latency.
296
            If set, it will override the `tools_strict` parameter set during component initialization.
297

298
        :returns:
299
            A dictionary with the following key:
300
            - `replies`: A list containing the generated responses as ChatMessage instances.
301
        """
302
        if len(messages) == 0:
1✔
303
            return {"replies": []}
1✔
304

305
        streaming_callback = select_streaming_callback(
1✔
306
            init_callback=self.streaming_callback, runtime_callback=streaming_callback, requires_async=False
307
        )
308
        chat_completion: Union[Stream[ChatCompletionChunk], ChatCompletion, ParsedChatCompletion]
309

310
        api_args = self._prepare_api_call(
1✔
311
            messages=messages,
312
            streaming_callback=streaming_callback,
313
            generation_kwargs=generation_kwargs,
314
            tools=tools,
315
            tools_strict=tools_strict,
316
        )
317
        openai_endpoint = api_args.pop("openai_endpoint")
1✔
318
        openai_endpoint_method = getattr(self.client.chat.completions, openai_endpoint)
1✔
319
        chat_completion = openai_endpoint_method(**api_args)
1✔
320

321
        if streaming_callback is not None:
1✔
322
            completions = self._handle_stream_response(
1✔
323
                # we cannot check isinstance(chat_completion, Stream) because some observability tools wrap Stream
324
                # and return a different type. See https://github.com/deepset-ai/haystack/issues/9014.
325
                chat_completion,  # type: ignore
326
                streaming_callback,
327
            )
328

329
        else:
330
            assert isinstance(chat_completion, ChatCompletion), "Unexpected response type for non-streaming request."
1✔
331
            completions = [
1✔
332
                _convert_chat_completion_to_chat_message(chat_completion, choice) for choice in chat_completion.choices
333
            ]
334

335
        # before returning, do post-processing of the completions
336
        for message in completions:
1✔
337
            _check_finish_reason(message.meta)
1✔
338

339
        return {"replies": completions}
1✔
340

341
    @component.output_types(replies=list[ChatMessage])
1✔
342
    async def run_async(
1✔
343
        self,
344
        messages: list[ChatMessage],
345
        streaming_callback: Optional[StreamingCallbackT] = None,
346
        generation_kwargs: Optional[dict[str, Any]] = None,
347
        *,
348
        tools: Optional[Union[list[Tool], Toolset, list[Toolset]]] = None,
349
        tools_strict: Optional[bool] = None,
350
    ):
351
        """
352
        Asynchronously invokes chat completion based on the provided messages and generation parameters.
353

354
        This is the asynchronous version of the `run` method. It has the same parameters and return values
355
        but can be used with `await` in async code.
356

357
        :param messages:
358
            A list of ChatMessage instances representing the input messages.
359
        :param streaming_callback:
360
            A callback function that is called when a new token is received from the stream.
361
            Must be a coroutine.
362
        :param generation_kwargs:
363
            Additional keyword arguments for text generation. These parameters will
364
            override the parameters passed during component initialization.
365
            For details on OpenAI API parameters, see [OpenAI documentation](https://platform.openai.com/docs/api-reference/chat/create).
366
        :param tools:
367
            A list of tools or a Toolset for which the model can prepare calls. If set, it will override the
368
            `tools` parameter set during component initialization. This parameter can accept either a list of
369
            `Tool` objects or a `Toolset` instance.
370
        :param tools_strict:
371
            Whether to enable strict schema adherence for tool calls. If set to `True`, the model will follow exactly
372
            the schema provided in the `parameters` field of the tool definition, but this may increase latency.
373
            If set, it will override the `tools_strict` parameter set during component initialization.
374

375
        :returns:
376
            A dictionary with the following key:
377
            - `replies`: A list containing the generated responses as ChatMessage instances.
378
        """
379
        # validate and select the streaming callback
380
        streaming_callback = select_streaming_callback(
1✔
381
            init_callback=self.streaming_callback, runtime_callback=streaming_callback, requires_async=True
382
        )
383
        chat_completion: Union[AsyncStream[ChatCompletionChunk], ChatCompletion, ParsedChatCompletion]
384

385
        if len(messages) == 0:
1✔
386
            return {"replies": []}
×
387

388
        api_args = self._prepare_api_call(
1✔
389
            messages=messages,
390
            streaming_callback=streaming_callback,
391
            generation_kwargs=generation_kwargs,
392
            tools=tools,
393
            tools_strict=tools_strict,
394
        )
395

396
        openai_endpoint = api_args.pop("openai_endpoint")
1✔
397
        openai_endpoint_method = getattr(self.async_client.chat.completions, openai_endpoint)
1✔
398
        chat_completion = await openai_endpoint_method(**api_args)
1✔
399

400
        if streaming_callback is not None:
1✔
401
            completions = await self._handle_async_stream_response(
1✔
402
                # we cannot check isinstance(chat_completion, AsyncStream) because some observability tools wrap
403
                # AsyncStream and return a different type. See https://github.com/deepset-ai/haystack/issues/9014.
404
                chat_completion,  # type: ignore
405
                streaming_callback,
406
            )
407

408
        else:
409
            assert isinstance(chat_completion, ChatCompletion), "Unexpected response type for non-streaming request."
1✔
410
            completions = [
1✔
411
                _convert_chat_completion_to_chat_message(chat_completion, choice) for choice in chat_completion.choices
412
            ]
413

414
        # before returning, do post-processing of the completions
415
        for message in completions:
1✔
416
            _check_finish_reason(message.meta)
1✔
417

418
        return {"replies": completions}
1✔
419

420
    def _prepare_api_call(  # noqa: PLR0913
1✔
421
        self,
422
        *,
423
        messages: list[ChatMessage],
424
        streaming_callback: Optional[StreamingCallbackT] = None,
425
        generation_kwargs: Optional[dict[str, Any]] = None,
426
        tools: Optional[Union[list[Tool], Toolset, list[Toolset]]] = None,
427
        tools_strict: Optional[bool] = None,
428
    ) -> dict[str, Any]:
429
        # update generation kwargs by merging with the generation kwargs passed to the run method
430
        generation_kwargs = {**self.generation_kwargs, **(generation_kwargs or {})}
1✔
431

432
        is_streaming = streaming_callback is not None
1✔
433
        num_responses = generation_kwargs.pop("n", 1)
1✔
434

435
        if is_streaming and num_responses > 1:
1✔
436
            raise ValueError("Cannot stream multiple responses, please set n=1.")
×
437
        response_format = generation_kwargs.pop("response_format", None)
1✔
438

439
        # adapt ChatMessage(s) to the format expected by the OpenAI API
440
        openai_formatted_messages = [message.to_openai_dict_format() for message in messages]
1✔
441

442
        tools = flatten_tools_or_toolsets(tools or self.tools)
1✔
443
        tools_strict = tools_strict if tools_strict is not None else self.tools_strict
1✔
444
        _check_duplicate_tool_names(tools)
1✔
445

446
        openai_tools = {}
1✔
447
        if tools:
1✔
448
            tool_definitions = []
1✔
449
            for t in tools:
1✔
450
                function_spec = {**t.tool_spec}
1✔
451
                if tools_strict:
1✔
452
                    function_spec["strict"] = True
1✔
453
                    function_spec["parameters"]["additionalProperties"] = False
1✔
454
                tool_definitions.append({"type": "function", "function": function_spec})
1✔
455
            openai_tools = {"tools": tool_definitions}
1✔
456

457
        base_args = {
1✔
458
            "model": self.model,
459
            "messages": openai_formatted_messages,
460
            "n": num_responses,
461
            **openai_tools,
462
            **generation_kwargs,
463
        }
464

465
        if response_format and not is_streaming:
1✔
466
            # for structured outputs without streaming, we use openai's parse endpoint
467
            # Note: `stream` cannot be passed to chat.completions.parse
468
            # we pass a key `openai_endpoint` as a hint to the run method to use the parse endpoint
469
            # this key will be removed before the API call is made
470
            return {**base_args, "response_format": response_format, "openai_endpoint": "parse"}
1✔
471

472
        # for structured outputs with streaming, we use openai's create endpoint
473
        # we pass a key `openai_endpoint` as a hint to the run method to use the create endpoint
474
        # this key will be removed before the API call is made
475
        final_args = {**base_args, "stream": is_streaming, "openai_endpoint": "create"}
1✔
476

477
        # We only set the response_format parameter if it's not None since None is not a valid value in the API.
478
        if response_format:
1✔
479
            final_args["response_format"] = response_format
1✔
480
        return final_args
1✔
481

482
    def _handle_stream_response(self, chat_completion: Stream, callback: SyncStreamingCallbackT) -> list[ChatMessage]:
1✔
483
        component_info = ComponentInfo.from_component(self)
1✔
484
        chunks: list[StreamingChunk] = []
1✔
485
        for chunk in chat_completion:  # pylint: disable=not-an-iterable
1✔
486
            assert len(chunk.choices) <= 1, "Streaming responses should have at most one choice."
1✔
487
            chunk_delta = _convert_chat_completion_chunk_to_streaming_chunk(
1✔
488
                chunk=chunk, previous_chunks=chunks, component_info=component_info
489
            )
490
            chunks.append(chunk_delta)
1✔
491
            callback(chunk_delta)
1✔
492
        return [_convert_streaming_chunks_to_chat_message(chunks=chunks)]
1✔
493

494
    async def _handle_async_stream_response(
1✔
495
        self, chat_completion: AsyncStream, callback: AsyncStreamingCallbackT
496
    ) -> list[ChatMessage]:
497
        component_info = ComponentInfo.from_component(self)
1✔
498
        chunks: list[StreamingChunk] = []
1✔
499
        async for chunk in chat_completion:  # pylint: disable=not-an-iterable
1✔
500
            assert len(chunk.choices) <= 1, "Streaming responses should have at most one choice."
1✔
501
            chunk_delta = _convert_chat_completion_chunk_to_streaming_chunk(
1✔
502
                chunk=chunk, previous_chunks=chunks, component_info=component_info
503
            )
504
            chunks.append(chunk_delta)
1✔
505
            await callback(chunk_delta)
1✔
506
        return [_convert_streaming_chunks_to_chat_message(chunks=chunks)]
1✔
507

508

509
def _check_finish_reason(meta: dict[str, Any]) -> None:
1✔
510
    if meta["finish_reason"] == "length":
1✔
511
        logger.warning(
1✔
512
            "The completion for index {index} has been truncated before reaching a natural stopping point. "
513
            "Increase the max_completion_tokens parameter to allow for longer completions.",
514
            index=meta["index"],
515
            finish_reason=meta["finish_reason"],
516
        )
517
    if meta["finish_reason"] == "content_filter":
1✔
518
        logger.warning(
1✔
519
            "The completion for index {index} has been truncated due to the content filter.",
520
            index=meta["index"],
521
            finish_reason=meta["finish_reason"],
522
        )
523

524

525
def _convert_chat_completion_to_chat_message(
1✔
526
    completion: Union[ChatCompletion, ParsedChatCompletion], choice: Choice
527
) -> ChatMessage:
528
    """
529
    Converts the non-streaming response from the OpenAI API to a ChatMessage.
530

531
    :param completion: The completion returned by the OpenAI API.
532
    :param choice: The choice returned by the OpenAI API.
533
    :return: The ChatMessage.
534
    """
535
    message: Union[ChatCompletionMessage, ParsedChatCompletionMessage] = choice.message
1✔
536
    text = message.content
1✔
537
    tool_calls = []
1✔
538
    if message.tool_calls:
1✔
539
        # we currently only support function tools (not custom tools)
540
        # https://platform.openai.com/docs/guides/function-calling#custom-tools
541
        openai_tool_calls = [tc for tc in message.tool_calls if not isinstance(tc, ChatCompletionMessageCustomToolCall)]
1✔
542
        for openai_tc in openai_tool_calls:
1✔
543
            arguments_str = openai_tc.function.arguments
1✔
544
            try:
1✔
545
                arguments = json.loads(arguments_str)
1✔
546
                tool_calls.append(ToolCall(id=openai_tc.id, tool_name=openai_tc.function.name, arguments=arguments))
1✔
547
            except json.JSONDecodeError:
1✔
548
                logger.warning(
1✔
549
                    "OpenAI returned a malformed JSON string for tool call arguments. This tool call "
550
                    "will be skipped. To always generate a valid JSON, set `tools_strict` to `True`. "
551
                    "Tool call ID: {_id}, Tool name: {_name}, Arguments: {_arguments}",
552
                    _id=openai_tc.id,
553
                    _name=openai_tc.function.name,
554
                    _arguments=arguments_str,
555
                )
556

557
    chat_message = ChatMessage.from_assistant(
1✔
558
        text=text,
559
        tool_calls=tool_calls,
560
        meta={
561
            "model": completion.model,
562
            "index": choice.index,
563
            "finish_reason": choice.finish_reason,
564
            "usage": _serialize_usage(completion.usage),
565
        },
566
    )
567

568
    return chat_message
1✔
569

570

571
def _convert_chat_completion_chunk_to_streaming_chunk(
1✔
572
    chunk: ChatCompletionChunk, previous_chunks: list[StreamingChunk], component_info: Optional[ComponentInfo] = None
573
) -> StreamingChunk:
574
    """
575
    Converts the streaming response chunk from the OpenAI API to a StreamingChunk.
576

577
    :param chunk: The chunk returned by the OpenAI API.
578
    :param previous_chunks: A list of previously received StreamingChunks.
579
    :param component_info: An optional `ComponentInfo` object containing information about the component that
580
        generated the chunk, such as the component name and type.
581

582
    :returns:
583
        A StreamingChunk object representing the content of the chunk from the OpenAI API.
584
    """
585
    finish_reason_mapping: dict[str, FinishReason] = {
1✔
586
        "stop": "stop",
587
        "length": "length",
588
        "content_filter": "content_filter",
589
        "tool_calls": "tool_calls",
590
        "function_call": "tool_calls",
591
    }
592
    # On very first chunk so len(previous_chunks) == 0, the Choices field only provides role info (e.g. "assistant")
593
    # Choices is empty if include_usage is set to True where the usage information is returned.
594
    if len(chunk.choices) == 0:
1✔
595
        return StreamingChunk(
1✔
596
            content="",
597
            component_info=component_info,
598
            # Index is None since it's only set to an int when a content block is present
599
            index=None,
600
            finish_reason=None,
601
            meta={
602
                "model": chunk.model,
603
                "received_at": datetime.now().isoformat(),
604
                "usage": _serialize_usage(chunk.usage),
605
            },
606
        )
607

608
    choice: ChunkChoice = chunk.choices[0]
1✔
609

610
    # create a list of ToolCallDelta objects from the tool calls
611
    if choice.delta.tool_calls:
1✔
612
        tool_calls_deltas = []
1✔
613
        for tool_call in choice.delta.tool_calls:
1✔
614
            function = tool_call.function
1✔
615
            tool_calls_deltas.append(
1✔
616
                ToolCallDelta(
617
                    index=tool_call.index,
618
                    id=tool_call.id,
619
                    tool_name=function.name if function else None,
620
                    arguments=function.arguments if function and function.arguments else None,
621
                )
622
            )
623
        chunk_message = StreamingChunk(
1✔
624
            content=choice.delta.content or "",
625
            component_info=component_info,
626
            # We adopt the first tool_calls_deltas.index as the overall index of the chunk.
627
            index=tool_calls_deltas[0].index,
628
            tool_calls=tool_calls_deltas,
629
            start=tool_calls_deltas[0].tool_name is not None,
630
            finish_reason=finish_reason_mapping.get(choice.finish_reason) if choice.finish_reason else None,
631
            meta={
632
                "model": chunk.model,
633
                "index": choice.index,
634
                "tool_calls": choice.delta.tool_calls,
635
                "finish_reason": choice.finish_reason,
636
                "received_at": datetime.now().isoformat(),
637
                "usage": _serialize_usage(chunk.usage),
638
            },
639
        )
640
        return chunk_message
1✔
641

642
    # On very first chunk the choice field only provides role info (e.g. "assistant") so we set index to None
643
    # We set all chunks missing the content field to index of None. E.g. can happen if chunk only contains finish
644
    # reason.
645
    if choice.delta.content is None or choice.delta.role is not None:
1✔
646
        resolved_index = None
1✔
647
    else:
648
        # We set the index to be 0 since if text content is being streamed then no tool calls are being streamed
649
        # NOTE: We may need to revisit this if OpenAI allows planning/thinking content before tool calls like
650
        #       Anthropic Claude
651
        resolved_index = 0
1✔
652
    chunk_message = StreamingChunk(
1✔
653
        content=choice.delta.content or "",
654
        component_info=component_info,
655
        index=resolved_index,
656
        # The first chunk is always a start message chunk that only contains role information, so if we reach here
657
        # and previous_chunks is length 1 then this is the start of text content.
658
        start=len(previous_chunks) == 1,
659
        finish_reason=finish_reason_mapping.get(choice.finish_reason) if choice.finish_reason else None,
660
        meta={
661
            "model": chunk.model,
662
            "index": choice.index,
663
            "tool_calls": choice.delta.tool_calls,
664
            "finish_reason": choice.finish_reason,
665
            "received_at": datetime.now().isoformat(),
666
            "usage": _serialize_usage(chunk.usage),
667
        },
668
    )
669
    return chunk_message
1✔
670

671

672
def _serialize_usage(usage):
1✔
673
    """Convert OpenAI usage object to serializable dict recursively"""
674
    if hasattr(usage, "model_dump"):
1✔
675
        return usage.model_dump()
1✔
676
    elif hasattr(usage, "__dict__"):
1✔
677
        return {k: _serialize_usage(v) for k, v in usage.__dict__.items() if not k.startswith("_")}
×
678
    elif isinstance(usage, dict):
1✔
679
        return {k: _serialize_usage(v) for k, v in usage.items()}
×
680
    elif isinstance(usage, list):
1✔
681
        return [_serialize_usage(item) for item in usage]
×
682
    else:
683
        return usage
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc