• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

deepset-ai / haystack / 17092697650

20 Aug 2025 08:17AM UTC coverage: 92.167%. Remained the same
17092697650

Pull #9725

github

web-flow
Merge ccc4d885b into 9fae8e392
Pull Request #9725: fix: `OpenAIChatGenerator` - improve the logic to exclude custom tool calls

12907 of 14004 relevant lines covered (92.17%)

0.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.41
haystack/components/generators/chat/openai.py
1
# SPDX-FileCopyrightText: 2022-present deepset GmbH <info@deepset.ai>
2
#
3
# SPDX-License-Identifier: Apache-2.0
4

5
import json
1✔
6
import os
1✔
7
from datetime import datetime
1✔
8
from typing import Any, Optional, Union
1✔
9

10
from openai import AsyncOpenAI, AsyncStream, OpenAI, Stream
1✔
11
from openai.types.chat import (
1✔
12
    ChatCompletion,
13
    ChatCompletionChunk,
14
    ChatCompletionMessage,
15
    ChatCompletionMessageCustomToolCall,
16
)
17
from openai.types.chat.chat_completion import Choice
1✔
18
from openai.types.chat.chat_completion_chunk import Choice as ChunkChoice
1✔
19

20
from haystack import component, default_from_dict, default_to_dict, logging
1✔
21
from haystack.components.generators.utils import _convert_streaming_chunks_to_chat_message
1✔
22
from haystack.dataclasses import (
1✔
23
    AsyncStreamingCallbackT,
24
    ChatMessage,
25
    ComponentInfo,
26
    FinishReason,
27
    StreamingCallbackT,
28
    StreamingChunk,
29
    SyncStreamingCallbackT,
30
    ToolCall,
31
    ToolCallDelta,
32
    select_streaming_callback,
33
)
34
from haystack.tools import (
1✔
35
    Tool,
36
    Toolset,
37
    _check_duplicate_tool_names,
38
    deserialize_tools_or_toolset_inplace,
39
    serialize_tools_or_toolset,
40
)
41
from haystack.utils import Secret, deserialize_callable, deserialize_secrets_inplace, serialize_callable
1✔
42
from haystack.utils.http_client import init_http_client
1✔
43

44
logger = logging.getLogger(__name__)
1✔
45

46

47
@component
1✔
48
class OpenAIChatGenerator:
1✔
49
    """
50
    Completes chats using OpenAI's large language models (LLMs).
51

52
    It works with the gpt-4 and o-series models and supports streaming responses
53
    from OpenAI API. It uses [ChatMessage](https://docs.haystack.deepset.ai/docs/chatmessage)
54
    format in input and output.
55

56
    You can customize how the text is generated by passing parameters to the
57
    OpenAI API. Use the `**generation_kwargs` argument when you initialize
58
    the component or when you run it. Any parameter that works with
59
    `openai.ChatCompletion.create` will work here too.
60

61
    For details on OpenAI API parameters, see
62
    [OpenAI documentation](https://platform.openai.com/docs/api-reference/chat).
63

64
    ### Usage example
65

66
    ```python
67
    from haystack.components.generators.chat import OpenAIChatGenerator
68
    from haystack.dataclasses import ChatMessage
69

70
    messages = [ChatMessage.from_user("What's Natural Language Processing?")]
71

72
    client = OpenAIChatGenerator()
73
    response = client.run(messages)
74
    print(response)
75
    ```
76
    Output:
77
    ```
78
    {'replies':
79
        [ChatMessage(_role=<ChatRole.ASSISTANT: 'assistant'>, _content=
80
        [TextContent(text="Natural Language Processing (NLP) is a branch of artificial intelligence
81
            that focuses on enabling computers to understand, interpret, and generate human language in
82
            a way that is meaningful and useful.")],
83
         _name=None,
84
         _meta={'model': 'gpt-4o-mini', 'index': 0, 'finish_reason': 'stop',
85
         'usage': {'prompt_tokens': 15, 'completion_tokens': 36, 'total_tokens': 51}})
86
        ]
87
    }
88
    ```
89
    """
90

91
    def __init__(  # pylint: disable=too-many-positional-arguments
1✔
92
        self,
93
        api_key: Secret = Secret.from_env_var("OPENAI_API_KEY"),
94
        model: str = "gpt-4o-mini",
95
        streaming_callback: Optional[StreamingCallbackT] = None,
96
        api_base_url: Optional[str] = None,
97
        organization: Optional[str] = None,
98
        generation_kwargs: Optional[dict[str, Any]] = None,
99
        timeout: Optional[float] = None,
100
        max_retries: Optional[int] = None,
101
        tools: Optional[Union[list[Tool], Toolset]] = None,
102
        tools_strict: bool = False,
103
        http_client_kwargs: Optional[dict[str, Any]] = None,
104
    ):
105
        """
106
        Creates an instance of OpenAIChatGenerator. Unless specified otherwise in `model`, uses OpenAI's gpt-4o-mini
107

108
        Before initializing the component, you can set the 'OPENAI_TIMEOUT' and 'OPENAI_MAX_RETRIES'
109
        environment variables to override the `timeout` and `max_retries` parameters respectively
110
        in the OpenAI client.
111

112
        :param api_key: The OpenAI API key.
113
            You can set it with an environment variable `OPENAI_API_KEY`, or pass with this parameter
114
            during initialization.
115
        :param model: The name of the model to use.
116
        :param streaming_callback: A callback function that is called when a new token is received from the stream.
117
            The callback function accepts [StreamingChunk](https://docs.haystack.deepset.ai/docs/data-classes#streamingchunk)
118
            as an argument.
119
        :param api_base_url: An optional base URL.
120
        :param organization: Your organization ID, defaults to `None`. See
121
        [production best practices](https://platform.openai.com/docs/guides/production-best-practices/setting-up-your-organization).
122
        :param generation_kwargs: Other parameters to use for the model. These parameters are sent directly to
123
            the OpenAI endpoint. See OpenAI [documentation](https://platform.openai.com/docs/api-reference/chat) for
124
            more details.
125
            Some of the supported parameters:
126
            - `max_tokens`: The maximum number of tokens the output text can have.
127
            - `temperature`: What sampling temperature to use. Higher values mean the model will take more risks.
128
                Try 0.9 for more creative applications and 0 (argmax sampling) for ones with a well-defined answer.
129
            - `top_p`: An alternative to sampling with temperature, called nucleus sampling, where the model
130
                considers the results of the tokens with top_p probability mass. For example, 0.1 means only the tokens
131
                comprising the top 10% probability mass are considered.
132
            - `n`: How many completions to generate for each prompt. For example, if the LLM gets 3 prompts and n is 2,
133
                it will generate two completions for each of the three prompts, ending up with 6 completions in total.
134
            - `stop`: One or more sequences after which the LLM should stop generating tokens.
135
            - `presence_penalty`: What penalty to apply if a token is already present at all. Bigger values mean
136
                the model will be less likely to repeat the same token in the text.
137
            - `frequency_penalty`: What penalty to apply if a token has already been generated in the text.
138
                Bigger values mean the model will be less likely to repeat the same token in the text.
139
            - `logit_bias`: Add a logit bias to specific tokens. The keys of the dictionary are tokens, and the
140
                values are the bias to add to that token.
141
        :param timeout:
142
            Timeout for OpenAI client calls. If not set, it defaults to either the
143
            `OPENAI_TIMEOUT` environment variable, or 30 seconds.
144
        :param max_retries:
145
            Maximum number of retries to contact OpenAI after an internal error.
146
            If not set, it defaults to either the `OPENAI_MAX_RETRIES` environment variable, or set to 5.
147
        :param tools:
148
            A list of tools or a Toolset for which the model can prepare calls. This parameter can accept either a
149
            list of `Tool` objects or a `Toolset` instance.
150
        :param tools_strict:
151
            Whether to enable strict schema adherence for tool calls. If set to `True`, the model will follow exactly
152
            the schema provided in the `parameters` field of the tool definition, but this may increase latency.
153
        :param http_client_kwargs:
154
            A dictionary of keyword arguments to configure a custom `httpx.Client`or `httpx.AsyncClient`.
155
            For more information, see the [HTTPX documentation](https://www.python-httpx.org/api/#client).
156
        """
157
        self.api_key = api_key
1✔
158
        self.model = model
1✔
159
        self.generation_kwargs = generation_kwargs or {}
1✔
160
        self.streaming_callback = streaming_callback
1✔
161
        self.api_base_url = api_base_url
1✔
162
        self.organization = organization
1✔
163
        self.timeout = timeout
1✔
164
        self.max_retries = max_retries
1✔
165
        self.tools = tools  # Store tools as-is, whether it's a list or a Toolset
1✔
166
        self.tools_strict = tools_strict
1✔
167
        self.http_client_kwargs = http_client_kwargs
1✔
168
        # Check for duplicate tool names
169
        _check_duplicate_tool_names(list(self.tools or []))
1✔
170

171
        if timeout is None:
1✔
172
            timeout = float(os.environ.get("OPENAI_TIMEOUT", "30.0"))
1✔
173
        if max_retries is None:
1✔
174
            max_retries = int(os.environ.get("OPENAI_MAX_RETRIES", "5"))
1✔
175

176
        client_kwargs: dict[str, Any] = {
1✔
177
            "api_key": api_key.resolve_value(),
178
            "organization": organization,
179
            "base_url": api_base_url,
180
            "timeout": timeout,
181
            "max_retries": max_retries,
182
        }
183

184
        self.client = OpenAI(http_client=init_http_client(self.http_client_kwargs, async_client=False), **client_kwargs)
1✔
185
        self.async_client = AsyncOpenAI(
1✔
186
            http_client=init_http_client(self.http_client_kwargs, async_client=True), **client_kwargs
187
        )
188

189
    def _get_telemetry_data(self) -> dict[str, Any]:
1✔
190
        """
191
        Data that is sent to Posthog for usage analytics.
192
        """
193
        return {"model": self.model}
1✔
194

195
    def to_dict(self) -> dict[str, Any]:
1✔
196
        """
197
        Serialize this component to a dictionary.
198

199
        :returns:
200
            The serialized component as a dictionary.
201
        """
202
        callback_name = serialize_callable(self.streaming_callback) if self.streaming_callback else None
1✔
203
        return default_to_dict(
1✔
204
            self,
205
            model=self.model,
206
            streaming_callback=callback_name,
207
            api_base_url=self.api_base_url,
208
            organization=self.organization,
209
            generation_kwargs=self.generation_kwargs,
210
            api_key=self.api_key.to_dict(),
211
            timeout=self.timeout,
212
            max_retries=self.max_retries,
213
            tools=serialize_tools_or_toolset(self.tools),
214
            tools_strict=self.tools_strict,
215
            http_client_kwargs=self.http_client_kwargs,
216
        )
217

218
    @classmethod
1✔
219
    def from_dict(cls, data: dict[str, Any]) -> "OpenAIChatGenerator":
1✔
220
        """
221
        Deserialize this component from a dictionary.
222

223
        :param data: The dictionary representation of this component.
224
        :returns:
225
            The deserialized component instance.
226
        """
227
        deserialize_secrets_inplace(data["init_parameters"], keys=["api_key"])
1✔
228
        deserialize_tools_or_toolset_inplace(data["init_parameters"], key="tools")
1✔
229
        init_params = data.get("init_parameters", {})
1✔
230
        serialized_callback_handler = init_params.get("streaming_callback")
1✔
231
        if serialized_callback_handler:
1✔
232
            data["init_parameters"]["streaming_callback"] = deserialize_callable(serialized_callback_handler)
1✔
233
        return default_from_dict(cls, data)
1✔
234

235
    @component.output_types(replies=list[ChatMessage])
1✔
236
    def run(
1✔
237
        self,
238
        messages: list[ChatMessage],
239
        streaming_callback: Optional[StreamingCallbackT] = None,
240
        generation_kwargs: Optional[dict[str, Any]] = None,
241
        *,
242
        tools: Optional[Union[list[Tool], Toolset]] = None,
243
        tools_strict: Optional[bool] = None,
244
    ):
245
        """
246
        Invokes chat completion based on the provided messages and generation parameters.
247

248
        :param messages:
249
            A list of ChatMessage instances representing the input messages.
250
        :param streaming_callback:
251
            A callback function that is called when a new token is received from the stream.
252
        :param generation_kwargs:
253
            Additional keyword arguments for text generation. These parameters will
254
            override the parameters passed during component initialization.
255
            For details on OpenAI API parameters, see [OpenAI documentation](https://platform.openai.com/docs/api-reference/chat/create).
256
        :param tools:
257
            A list of tools or a Toolset for which the model can prepare calls. If set, it will override the
258
            `tools` parameter set during component initialization. This parameter can accept either a list of
259
            `Tool` objects or a `Toolset` instance.
260
        :param tools_strict:
261
            Whether to enable strict schema adherence for tool calls. If set to `True`, the model will follow exactly
262
            the schema provided in the `parameters` field of the tool definition, but this may increase latency.
263
            If set, it will override the `tools_strict` parameter set during component initialization.
264

265
        :returns:
266
            A dictionary with the following key:
267
            - `replies`: A list containing the generated responses as ChatMessage instances.
268
        """
269
        if len(messages) == 0:
1✔
270
            return {"replies": []}
1✔
271

272
        streaming_callback = select_streaming_callback(
1✔
273
            init_callback=self.streaming_callback, runtime_callback=streaming_callback, requires_async=False
274
        )
275

276
        api_args = self._prepare_api_call(
1✔
277
            messages=messages,
278
            streaming_callback=streaming_callback,
279
            generation_kwargs=generation_kwargs,
280
            tools=tools,
281
            tools_strict=tools_strict,
282
        )
283
        chat_completion: Union[Stream[ChatCompletionChunk], ChatCompletion] = self.client.chat.completions.create(
1✔
284
            **api_args
285
        )
286

287
        if streaming_callback is not None:
1✔
288
            completions = self._handle_stream_response(
1✔
289
                # we cannot check isinstance(chat_completion, Stream) because some observability tools wrap Stream
290
                # and return a different type. See https://github.com/deepset-ai/haystack/issues/9014.
291
                chat_completion,  # type: ignore
292
                streaming_callback,
293
            )
294

295
        else:
296
            assert isinstance(chat_completion, ChatCompletion), "Unexpected response type for non-streaming request."
1✔
297
            completions = [
1✔
298
                _convert_chat_completion_to_chat_message(chat_completion, choice) for choice in chat_completion.choices
299
            ]
300

301
        # before returning, do post-processing of the completions
302
        for message in completions:
1✔
303
            _check_finish_reason(message.meta)
1✔
304

305
        return {"replies": completions}
1✔
306

307
    @component.output_types(replies=list[ChatMessage])
1✔
308
    async def run_async(
1✔
309
        self,
310
        messages: list[ChatMessage],
311
        streaming_callback: Optional[StreamingCallbackT] = None,
312
        generation_kwargs: Optional[dict[str, Any]] = None,
313
        *,
314
        tools: Optional[Union[list[Tool], Toolset]] = None,
315
        tools_strict: Optional[bool] = None,
316
    ):
317
        """
318
        Asynchronously invokes chat completion based on the provided messages and generation parameters.
319

320
        This is the asynchronous version of the `run` method. It has the same parameters and return values
321
        but can be used with `await` in async code.
322

323
        :param messages:
324
            A list of ChatMessage instances representing the input messages.
325
        :param streaming_callback:
326
            A callback function that is called when a new token is received from the stream.
327
            Must be a coroutine.
328
        :param generation_kwargs:
329
            Additional keyword arguments for text generation. These parameters will
330
            override the parameters passed during component initialization.
331
            For details on OpenAI API parameters, see [OpenAI documentation](https://platform.openai.com/docs/api-reference/chat/create).
332
        :param tools:
333
            A list of tools or a Toolset for which the model can prepare calls. If set, it will override the
334
            `tools` parameter set during component initialization. This parameter can accept either a list of
335
            `Tool` objects or a `Toolset` instance.
336
        :param tools_strict:
337
            Whether to enable strict schema adherence for tool calls. If set to `True`, the model will follow exactly
338
            the schema provided in the `parameters` field of the tool definition, but this may increase latency.
339
            If set, it will override the `tools_strict` parameter set during component initialization.
340

341
        :returns:
342
            A dictionary with the following key:
343
            - `replies`: A list containing the generated responses as ChatMessage instances.
344
        """
345
        # validate and select the streaming callback
346
        streaming_callback = select_streaming_callback(
1✔
347
            init_callback=self.streaming_callback, runtime_callback=streaming_callback, requires_async=True
348
        )
349

350
        if len(messages) == 0:
1✔
351
            return {"replies": []}
×
352

353
        api_args = self._prepare_api_call(
1✔
354
            messages=messages,
355
            streaming_callback=streaming_callback,
356
            generation_kwargs=generation_kwargs,
357
            tools=tools,
358
            tools_strict=tools_strict,
359
        )
360

361
        chat_completion: Union[
1✔
362
            AsyncStream[ChatCompletionChunk], ChatCompletion
363
        ] = await self.async_client.chat.completions.create(**api_args)
364

365
        if streaming_callback is not None:
1✔
366
            completions = await self._handle_async_stream_response(
1✔
367
                # we cannot check isinstance(chat_completion, AsyncStream) because some observability tools wrap
368
                # AsyncStream and return a different type. See https://github.com/deepset-ai/haystack/issues/9014.
369
                chat_completion,  # type: ignore
370
                streaming_callback,
371
            )
372

373
        else:
374
            assert isinstance(chat_completion, ChatCompletion), "Unexpected response type for non-streaming request."
1✔
375
            completions = [
1✔
376
                _convert_chat_completion_to_chat_message(chat_completion, choice) for choice in chat_completion.choices
377
            ]
378

379
        # before returning, do post-processing of the completions
380
        for message in completions:
1✔
381
            _check_finish_reason(message.meta)
1✔
382

383
        return {"replies": completions}
1✔
384

385
    def _prepare_api_call(  # noqa: PLR0913
1✔
386
        self,
387
        *,
388
        messages: list[ChatMessage],
389
        streaming_callback: Optional[StreamingCallbackT] = None,
390
        generation_kwargs: Optional[dict[str, Any]] = None,
391
        tools: Optional[Union[list[Tool], Toolset]] = None,
392
        tools_strict: Optional[bool] = None,
393
    ) -> dict[str, Any]:
394
        # update generation kwargs by merging with the generation kwargs passed to the run method
395
        generation_kwargs = {**self.generation_kwargs, **(generation_kwargs or {})}
1✔
396

397
        # adapt ChatMessage(s) to the format expected by the OpenAI API
398
        openai_formatted_messages = [message.to_openai_dict_format() for message in messages]
1✔
399

400
        tools = tools or self.tools
1✔
401
        if isinstance(tools, Toolset):
1✔
402
            tools = list(tools)
×
403
        tools_strict = tools_strict if tools_strict is not None else self.tools_strict
1✔
404
        _check_duplicate_tool_names(tools)
1✔
405

406
        openai_tools = {}
1✔
407
        if tools:
1✔
408
            tool_definitions = []
1✔
409
            for t in tools:
1✔
410
                function_spec = {**t.tool_spec}
1✔
411
                if tools_strict:
1✔
412
                    function_spec["strict"] = True
1✔
413
                    function_spec["parameters"]["additionalProperties"] = False
1✔
414
                tool_definitions.append({"type": "function", "function": function_spec})
1✔
415
            openai_tools = {"tools": tool_definitions}
1✔
416

417
        is_streaming = streaming_callback is not None
1✔
418
        num_responses = generation_kwargs.pop("n", 1)
1✔
419
        if is_streaming and num_responses > 1:
1✔
420
            raise ValueError("Cannot stream multiple responses, please set n=1.")
×
421

422
        return {
1✔
423
            "model": self.model,
424
            "messages": openai_formatted_messages,
425
            "stream": streaming_callback is not None,
426
            "n": num_responses,
427
            **openai_tools,
428
            **generation_kwargs,
429
        }
430

431
    def _handle_stream_response(self, chat_completion: Stream, callback: SyncStreamingCallbackT) -> list[ChatMessage]:
1✔
432
        component_info = ComponentInfo.from_component(self)
1✔
433
        chunks: list[StreamingChunk] = []
1✔
434
        for chunk in chat_completion:  # pylint: disable=not-an-iterable
1✔
435
            assert len(chunk.choices) <= 1, "Streaming responses should have at most one choice."
1✔
436
            chunk_delta = _convert_chat_completion_chunk_to_streaming_chunk(
1✔
437
                chunk=chunk, previous_chunks=chunks, component_info=component_info
438
            )
439
            chunks.append(chunk_delta)
1✔
440
            callback(chunk_delta)
1✔
441
        return [_convert_streaming_chunks_to_chat_message(chunks=chunks)]
1✔
442

443
    async def _handle_async_stream_response(
1✔
444
        self, chat_completion: AsyncStream, callback: AsyncStreamingCallbackT
445
    ) -> list[ChatMessage]:
446
        component_info = ComponentInfo.from_component(self)
1✔
447
        chunks: list[StreamingChunk] = []
1✔
448
        async for chunk in chat_completion:  # pylint: disable=not-an-iterable
1✔
449
            assert len(chunk.choices) <= 1, "Streaming responses should have at most one choice."
1✔
450
            chunk_delta = _convert_chat_completion_chunk_to_streaming_chunk(
1✔
451
                chunk=chunk, previous_chunks=chunks, component_info=component_info
452
            )
453
            chunks.append(chunk_delta)
1✔
454
            await callback(chunk_delta)
1✔
455
        return [_convert_streaming_chunks_to_chat_message(chunks=chunks)]
1✔
456

457

458
def _check_finish_reason(meta: dict[str, Any]) -> None:
1✔
459
    if meta["finish_reason"] == "length":
1✔
460
        logger.warning(
1✔
461
            "The completion for index {index} has been truncated before reaching a natural stopping point. "
462
            "Increase the max_tokens parameter to allow for longer completions.",
463
            index=meta["index"],
464
            finish_reason=meta["finish_reason"],
465
        )
466
    if meta["finish_reason"] == "content_filter":
1✔
467
        logger.warning(
1✔
468
            "The completion for index {index} has been truncated due to the content filter.",
469
            index=meta["index"],
470
            finish_reason=meta["finish_reason"],
471
        )
472

473

474
def _convert_chat_completion_to_chat_message(completion: ChatCompletion, choice: Choice) -> ChatMessage:
1✔
475
    """
476
    Converts the non-streaming response from the OpenAI API to a ChatMessage.
477

478
    :param completion: The completion returned by the OpenAI API.
479
    :param choice: The choice returned by the OpenAI API.
480
    :return: The ChatMessage.
481
    """
482
    message: ChatCompletionMessage = choice.message
1✔
483
    text = message.content
1✔
484
    tool_calls = []
1✔
485
    if message.tool_calls:
1✔
486
        # we currently only support function tools (not custom tools)
487
        # https://platform.openai.com/docs/guides/function-calling#custom-tools
488
        openai_tool_calls = [tc for tc in message.tool_calls if not isinstance(tc, ChatCompletionMessageCustomToolCall)]
1✔
489
        for openai_tc in openai_tool_calls:
1✔
490
            arguments_str = openai_tc.function.arguments
1✔
491
            try:
1✔
492
                arguments = json.loads(arguments_str)
1✔
493
                tool_calls.append(ToolCall(id=openai_tc.id, tool_name=openai_tc.function.name, arguments=arguments))
1✔
494
            except json.JSONDecodeError:
1✔
495
                logger.warning(
1✔
496
                    "OpenAI returned a malformed JSON string for tool call arguments. This tool call "
497
                    "will be skipped. To always generate a valid JSON, set `tools_strict` to `True`. "
498
                    "Tool call ID: {_id}, Tool name: {_name}, Arguments: {_arguments}",
499
                    _id=openai_tc.id,
500
                    _name=openai_tc.function.name,
501
                    _arguments=arguments_str,
502
                )
503

504
    chat_message = ChatMessage.from_assistant(
1✔
505
        text=text,
506
        tool_calls=tool_calls,
507
        meta={
508
            "model": completion.model,
509
            "index": choice.index,
510
            "finish_reason": choice.finish_reason,
511
            "usage": _serialize_usage(completion.usage),
512
        },
513
    )
514
    return chat_message
1✔
515

516

517
def _convert_chat_completion_chunk_to_streaming_chunk(
1✔
518
    chunk: ChatCompletionChunk, previous_chunks: list[StreamingChunk], component_info: Optional[ComponentInfo] = None
519
) -> StreamingChunk:
520
    """
521
    Converts the streaming response chunk from the OpenAI API to a StreamingChunk.
522

523
    :param chunk: The chunk returned by the OpenAI API.
524
    :param previous_chunks: A list of previously received StreamingChunks.
525
    :param component_info: An optional `ComponentInfo` object containing information about the component that
526
        generated the chunk, such as the component name and type.
527

528
    :returns:
529
        A StreamingChunk object representing the content of the chunk from the OpenAI API.
530
    """
531
    finish_reason_mapping: dict[str, FinishReason] = {
1✔
532
        "stop": "stop",
533
        "length": "length",
534
        "content_filter": "content_filter",
535
        "tool_calls": "tool_calls",
536
        "function_call": "tool_calls",
537
    }
538
    # On very first chunk so len(previous_chunks) == 0, the Choices field only provides role info (e.g. "assistant")
539
    # Choices is empty if include_usage is set to True where the usage information is returned.
540
    if len(chunk.choices) == 0:
1✔
541
        return StreamingChunk(
1✔
542
            content="",
543
            component_info=component_info,
544
            # Index is None since it's only set to an int when a content block is present
545
            index=None,
546
            finish_reason=None,
547
            meta={
548
                "model": chunk.model,
549
                "received_at": datetime.now().isoformat(),
550
                "usage": _serialize_usage(chunk.usage),
551
            },
552
        )
553

554
    choice: ChunkChoice = chunk.choices[0]
1✔
555

556
    # create a list of ToolCallDelta objects from the tool calls
557
    if choice.delta.tool_calls:
1✔
558
        tool_calls_deltas = []
1✔
559
        for tool_call in choice.delta.tool_calls:
1✔
560
            function = tool_call.function
1✔
561
            tool_calls_deltas.append(
1✔
562
                ToolCallDelta(
563
                    index=tool_call.index,
564
                    id=tool_call.id,
565
                    tool_name=function.name if function else None,
566
                    arguments=function.arguments if function and function.arguments else None,
567
                )
568
            )
569
        chunk_message = StreamingChunk(
1✔
570
            content=choice.delta.content or "",
571
            component_info=component_info,
572
            # We adopt the first tool_calls_deltas.index as the overall index of the chunk.
573
            index=tool_calls_deltas[0].index,
574
            tool_calls=tool_calls_deltas,
575
            start=tool_calls_deltas[0].tool_name is not None,
576
            finish_reason=finish_reason_mapping.get(choice.finish_reason) if choice.finish_reason else None,
577
            meta={
578
                "model": chunk.model,
579
                "index": choice.index,
580
                "tool_calls": choice.delta.tool_calls,
581
                "finish_reason": choice.finish_reason,
582
                "received_at": datetime.now().isoformat(),
583
                "usage": _serialize_usage(chunk.usage),
584
            },
585
        )
586
        return chunk_message
1✔
587

588
    # On very first chunk the choice field only provides role info (e.g. "assistant") so we set index to None
589
    # We set all chunks missing the content field to index of None. E.g. can happen if chunk only contains finish
590
    # reason.
591
    if choice.delta.content is None or choice.delta.role is not None:
1✔
592
        resolved_index = None
1✔
593
    else:
594
        # We set the index to be 0 since if text content is being streamed then no tool calls are being streamed
595
        # NOTE: We may need to revisit this if OpenAI allows planning/thinking content before tool calls like
596
        #       Anthropic Claude
597
        resolved_index = 0
1✔
598
    chunk_message = StreamingChunk(
1✔
599
        content=choice.delta.content or "",
600
        component_info=component_info,
601
        index=resolved_index,
602
        # The first chunk is always a start message chunk that only contains role information, so if we reach here
603
        # and previous_chunks is length 1 then this is the start of text content.
604
        start=len(previous_chunks) == 1,
605
        finish_reason=finish_reason_mapping.get(choice.finish_reason) if choice.finish_reason else None,
606
        meta={
607
            "model": chunk.model,
608
            "index": choice.index,
609
            "tool_calls": choice.delta.tool_calls,
610
            "finish_reason": choice.finish_reason,
611
            "received_at": datetime.now().isoformat(),
612
            "usage": _serialize_usage(chunk.usage),
613
        },
614
    )
615
    return chunk_message
1✔
616

617

618
def _serialize_usage(usage):
1✔
619
    """Convert OpenAI usage object to serializable dict recursively"""
620
    if hasattr(usage, "model_dump"):
1✔
621
        return usage.model_dump()
1✔
622
    elif hasattr(usage, "__dict__"):
1✔
623
        return {k: _serialize_usage(v) for k, v in usage.__dict__.items() if not k.startswith("_")}
×
624
    elif isinstance(usage, dict):
1✔
625
        return {k: _serialize_usage(v) for k, v in usage.items()}
×
626
    elif isinstance(usage, list):
1✔
627
        return [_serialize_usage(item) for item in usage]
×
628
    else:
629
        return usage
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc