• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

deepset-ai / haystack / 18818043546

26 Oct 2025 12:38PM UTC coverage: 92.24% (+0.02%) from 92.219%
18818043546

Pull #9942

github

web-flow
Merge 9ca93ecfb into 554616981
Pull Request #9942: feat: Add warm_up() method to ChatGenerators for tool initialization

13491 of 14626 relevant lines covered (92.24%)

0.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.34
haystack/components/generators/chat/openai.py
1
# SPDX-FileCopyrightText: 2022-present deepset GmbH <info@deepset.ai>
2
#
3
# SPDX-License-Identifier: Apache-2.0
4

5
import json
1✔
6
import os
1✔
7
from datetime import datetime
1✔
8
from typing import Any, Optional, Union
1✔
9

10
from openai import AsyncOpenAI, AsyncStream, OpenAI, Stream
1✔
11
from openai.lib._pydantic import to_strict_json_schema
1✔
12
from openai.types.chat import (
1✔
13
    ChatCompletion,
14
    ChatCompletionChunk,
15
    ChatCompletionMessage,
16
    ChatCompletionMessageCustomToolCall,
17
    ParsedChatCompletion,
18
    ParsedChatCompletionMessage,
19
)
20
from openai.types.chat.chat_completion import Choice
1✔
21
from openai.types.chat.chat_completion_chunk import Choice as ChunkChoice
1✔
22
from pydantic import BaseModel
1✔
23

24
from haystack import component, default_from_dict, default_to_dict, logging
1✔
25
from haystack.components.generators.utils import _convert_streaming_chunks_to_chat_message
1✔
26
from haystack.dataclasses import (
1✔
27
    AsyncStreamingCallbackT,
28
    ChatMessage,
29
    ComponentInfo,
30
    FinishReason,
31
    StreamingCallbackT,
32
    StreamingChunk,
33
    SyncStreamingCallbackT,
34
    ToolCall,
35
    ToolCallDelta,
36
    select_streaming_callback,
37
)
38
from haystack.tools import (
1✔
39
    ToolsType,
40
    _check_duplicate_tool_names,
41
    deserialize_tools_or_toolset_inplace,
42
    flatten_tools_or_toolsets,
43
    serialize_tools_or_toolset,
44
    warm_up_tools,
45
)
46
from haystack.utils import Secret, deserialize_callable, deserialize_secrets_inplace, serialize_callable
1✔
47
from haystack.utils.http_client import init_http_client
1✔
48

49
logger = logging.getLogger(__name__)
1✔
50

51

52
@component
1✔
53
class OpenAIChatGenerator:
1✔
54
    """
55
    Completes chats using OpenAI's large language models (LLMs).
56

57
    It works with the gpt-4 and o-series models and supports streaming responses
58
    from OpenAI API. It uses [ChatMessage](https://docs.haystack.deepset.ai/docs/chatmessage)
59
    format in input and output.
60

61
    You can customize how the text is generated by passing parameters to the
62
    OpenAI API. Use the `**generation_kwargs` argument when you initialize
63
    the component or when you run it. Any parameter that works with
64
    `openai.ChatCompletion.create` will work here too.
65

66
    For details on OpenAI API parameters, see
67
    [OpenAI documentation](https://platform.openai.com/docs/api-reference/chat).
68

69
    ### Usage example
70

71
    ```python
72
    from haystack.components.generators.chat import OpenAIChatGenerator
73
    from haystack.dataclasses import ChatMessage
74

75
    messages = [ChatMessage.from_user("What's Natural Language Processing?")]
76

77
    client = OpenAIChatGenerator()
78
    response = client.run(messages)
79
    print(response)
80
    ```
81
    Output:
82
    ```
83
    {'replies':
84
        [ChatMessage(_role=<ChatRole.ASSISTANT: 'assistant'>, _content=
85
        [TextContent(text="Natural Language Processing (NLP) is a branch of artificial intelligence
86
            that focuses on enabling computers to understand, interpret, and generate human language in
87
            a way that is meaningful and useful.")],
88
         _name=None,
89
         _meta={'model': 'gpt-4o-mini', 'index': 0, 'finish_reason': 'stop',
90
         'usage': {'prompt_tokens': 15, 'completion_tokens': 36, 'total_tokens': 51}})
91
        ]
92
    }
93
    ```
94
    """
95

96
    def __init__(  # pylint: disable=too-many-positional-arguments
1✔
97
        self,
98
        api_key: Secret = Secret.from_env_var("OPENAI_API_KEY"),
99
        model: str = "gpt-4o-mini",
100
        streaming_callback: Optional[StreamingCallbackT] = None,
101
        api_base_url: Optional[str] = None,
102
        organization: Optional[str] = None,
103
        generation_kwargs: Optional[dict[str, Any]] = None,
104
        timeout: Optional[float] = None,
105
        max_retries: Optional[int] = None,
106
        tools: Optional[ToolsType] = None,
107
        tools_strict: bool = False,
108
        http_client_kwargs: Optional[dict[str, Any]] = None,
109
    ):
110
        """
111
        Creates an instance of OpenAIChatGenerator. Unless specified otherwise in `model`, uses OpenAI's gpt-4o-mini
112

113
        Before initializing the component, you can set the 'OPENAI_TIMEOUT' and 'OPENAI_MAX_RETRIES'
114
        environment variables to override the `timeout` and `max_retries` parameters respectively
115
        in the OpenAI client.
116

117
        :param api_key: The OpenAI API key.
118
            You can set it with an environment variable `OPENAI_API_KEY`, or pass with this parameter
119
            during initialization.
120
        :param model: The name of the model to use.
121
        :param streaming_callback: A callback function that is called when a new token is received from the stream.
122
            The callback function accepts [StreamingChunk](https://docs.haystack.deepset.ai/docs/data-classes#streamingchunk)
123
            as an argument.
124
        :param api_base_url: An optional base URL.
125
        :param organization: Your organization ID, defaults to `None`. See
126
        [production best practices](https://platform.openai.com/docs/guides/production-best-practices/setting-up-your-organization).
127
        :param generation_kwargs: Other parameters to use for the model. These parameters are sent directly to
128
            the OpenAI endpoint. See OpenAI [documentation](https://platform.openai.com/docs/api-reference/chat) for
129
            more details.
130
            Some of the supported parameters:
131
            - `max_completion_tokens`: An upper bound for the number of tokens that can be generated for a completion,
132
                including visible output tokens and reasoning tokens.
133
            - `temperature`: What sampling temperature to use. Higher values mean the model will take more risks.
134
                Try 0.9 for more creative applications and 0 (argmax sampling) for ones with a well-defined answer.
135
            - `top_p`: An alternative to sampling with temperature, called nucleus sampling, where the model
136
                considers the results of the tokens with top_p probability mass. For example, 0.1 means only the tokens
137
                comprising the top 10% probability mass are considered.
138
            - `n`: How many completions to generate for each prompt. For example, if the LLM gets 3 prompts and n is 2,
139
                it will generate two completions for each of the three prompts, ending up with 6 completions in total.
140
            - `stop`: One or more sequences after which the LLM should stop generating tokens.
141
            - `presence_penalty`: What penalty to apply if a token is already present at all. Bigger values mean
142
                the model will be less likely to repeat the same token in the text.
143
            - `frequency_penalty`: What penalty to apply if a token has already been generated in the text.
144
                Bigger values mean the model will be less likely to repeat the same token in the text.
145
            - `logit_bias`: Add a logit bias to specific tokens. The keys of the dictionary are tokens, and the
146
                values are the bias to add to that token.
147
            - `response_format`: A JSON schema or a Pydantic model that enforces the structure of the model's response.
148
                If provided, the output will always be validated against this
149
                format (unless the model returns a tool call).
150
                For details, see the [OpenAI Structured Outputs documentation](https://platform.openai.com/docs/guides/structured-outputs).
151
                Notes:
152
                - This parameter accepts Pydantic models and JSON schemas for latest models starting from GPT-4o.
153
                  Older models only support basic version of structured outputs through `{"type": "json_object"}`.
154
                  For detailed information on JSON mode, see the [OpenAI Structured Outputs documentation](https://platform.openai.com/docs/guides/structured-outputs#json-mode).
155
                - For structured outputs with streaming,
156
                  the `response_format` must be a JSON schema and not a Pydantic model.
157
        :param timeout:
158
            Timeout for OpenAI client calls. If not set, it defaults to either the
159
            `OPENAI_TIMEOUT` environment variable, or 30 seconds.
160
        :param max_retries:
161
            Maximum number of retries to contact OpenAI after an internal error.
162
            If not set, it defaults to either the `OPENAI_MAX_RETRIES` environment variable, or set to 5.
163
        :param tools:
164
            A list of Tool and/or Toolset objects, or a single Toolset for which the model can prepare calls.
165
        :param tools_strict:
166
            Whether to enable strict schema adherence for tool calls. If set to `True`, the model will follow exactly
167
            the schema provided in the `parameters` field of the tool definition, but this may increase latency.
168
        :param http_client_kwargs:
169
            A dictionary of keyword arguments to configure a custom `httpx.Client`or `httpx.AsyncClient`.
170
            For more information, see the [HTTPX documentation](https://www.python-httpx.org/api/#client).
171

172
        """
173
        self.api_key = api_key
1✔
174
        self.model = model
1✔
175
        self.generation_kwargs = generation_kwargs or {}
1✔
176
        self.streaming_callback = streaming_callback
1✔
177
        self.api_base_url = api_base_url
1✔
178
        self.organization = organization
1✔
179
        self.timeout = timeout
1✔
180
        self.max_retries = max_retries
1✔
181
        self.tools = tools  # Store tools as-is, whether it's a list or a Toolset
1✔
182
        self.tools_strict = tools_strict
1✔
183
        self.http_client_kwargs = http_client_kwargs
1✔
184
        # Check for duplicate tool names
185
        _check_duplicate_tool_names(flatten_tools_or_toolsets(self.tools))
1✔
186

187
        if timeout is None:
1✔
188
            timeout = float(os.environ.get("OPENAI_TIMEOUT", "30.0"))
1✔
189
        if max_retries is None:
1✔
190
            max_retries = int(os.environ.get("OPENAI_MAX_RETRIES", "5"))
1✔
191

192
        client_kwargs: dict[str, Any] = {
1✔
193
            "api_key": api_key.resolve_value(),
194
            "organization": organization,
195
            "base_url": api_base_url,
196
            "timeout": timeout,
197
            "max_retries": max_retries,
198
        }
199

200
        self.client = OpenAI(http_client=init_http_client(self.http_client_kwargs, async_client=False), **client_kwargs)
1✔
201
        self.async_client = AsyncOpenAI(
1✔
202
            http_client=init_http_client(self.http_client_kwargs, async_client=True), **client_kwargs
203
        )
204
        self._is_warmed_up = False
1✔
205

206
    def warm_up(self):
1✔
207
        """
208
        Warm up the OpenAI chat generator.
209

210
        This will warm up the tools registered in the chat generator.
211
        This method is idempotent and will only warm up the tools once.
212
        """
213
        if not self._is_warmed_up:
1✔
214
            warm_up_tools(self.tools)
1✔
215
            self._is_warmed_up = True
1✔
216

217
    def _get_telemetry_data(self) -> dict[str, Any]:
1✔
218
        """
219
        Data that is sent to Posthog for usage analytics.
220
        """
221
        return {"model": self.model}
1✔
222

223
    def to_dict(self) -> dict[str, Any]:
1✔
224
        """
225
        Serialize this component to a dictionary.
226

227
        :returns:
228
            The serialized component as a dictionary.
229
        """
230
        callback_name = serialize_callable(self.streaming_callback) if self.streaming_callback else None
1✔
231
        generation_kwargs = self.generation_kwargs.copy()
1✔
232
        response_format = generation_kwargs.get("response_format")
1✔
233

234
        # If the response format is a Pydantic model, it's converted to openai's json schema format
235
        # If it's already a json schema, it's left as is
236
        if response_format and isinstance(response_format, type) and issubclass(response_format, BaseModel):
1✔
237
            json_schema = {
1✔
238
                "type": "json_schema",
239
                "json_schema": {
240
                    "name": response_format.__name__,
241
                    "strict": True,
242
                    "schema": to_strict_json_schema(response_format),
243
                },
244
            }
245
            generation_kwargs["response_format"] = json_schema
1✔
246

247
        return default_to_dict(
1✔
248
            self,
249
            model=self.model,
250
            streaming_callback=callback_name,
251
            api_base_url=self.api_base_url,
252
            organization=self.organization,
253
            generation_kwargs=generation_kwargs,
254
            api_key=self.api_key.to_dict(),
255
            timeout=self.timeout,
256
            max_retries=self.max_retries,
257
            tools=serialize_tools_or_toolset(self.tools),
258
            tools_strict=self.tools_strict,
259
            http_client_kwargs=self.http_client_kwargs,
260
        )
261

262
    @classmethod
1✔
263
    def from_dict(cls, data: dict[str, Any]) -> "OpenAIChatGenerator":
1✔
264
        """
265
        Deserialize this component from a dictionary.
266

267
        :param data: The dictionary representation of this component.
268
        :returns:
269
            The deserialized component instance.
270
        """
271
        deserialize_secrets_inplace(data["init_parameters"], keys=["api_key"])
1✔
272
        deserialize_tools_or_toolset_inplace(data["init_parameters"], key="tools")
1✔
273
        init_params = data.get("init_parameters", {})
1✔
274
        serialized_callback_handler = init_params.get("streaming_callback")
1✔
275

276
        if serialized_callback_handler:
1✔
277
            data["init_parameters"]["streaming_callback"] = deserialize_callable(serialized_callback_handler)
1✔
278
        return default_from_dict(cls, data)
1✔
279

280
    @component.output_types(replies=list[ChatMessage])
1✔
281
    def run(
1✔
282
        self,
283
        messages: list[ChatMessage],
284
        streaming_callback: Optional[StreamingCallbackT] = None,
285
        generation_kwargs: Optional[dict[str, Any]] = None,
286
        *,
287
        tools: Optional[ToolsType] = None,
288
        tools_strict: Optional[bool] = None,
289
    ):
290
        """
291
        Invokes chat completion based on the provided messages and generation parameters.
292

293
        :param messages:
294
            A list of ChatMessage instances representing the input messages.
295
        :param streaming_callback:
296
            A callback function that is called when a new token is received from the stream.
297
        :param generation_kwargs:
298
            Additional keyword arguments for text generation. These parameters will
299
            override the parameters passed during component initialization.
300
            For details on OpenAI API parameters, see [OpenAI documentation](https://platform.openai.com/docs/api-reference/chat/create).
301
        :param tools:
302
            A list of Tool and/or Toolset objects, or a single Toolset for which the model can prepare calls.
303
            If set, it will override the `tools` parameter provided during initialization.
304
        :param tools_strict:
305
            Whether to enable strict schema adherence for tool calls. If set to `True`, the model will follow exactly
306
            the schema provided in the `parameters` field of the tool definition, but this may increase latency.
307
            If set, it will override the `tools_strict` parameter set during component initialization.
308

309
        :returns:
310
            A dictionary with the following key:
311
            - `replies`: A list containing the generated responses as ChatMessage instances.
312
        """
313
        if len(messages) == 0:
1✔
314
            return {"replies": []}
1✔
315

316
        streaming_callback = select_streaming_callback(
1✔
317
            init_callback=self.streaming_callback, runtime_callback=streaming_callback, requires_async=False
318
        )
319
        chat_completion: Union[Stream[ChatCompletionChunk], ChatCompletion, ParsedChatCompletion]
320

321
        api_args = self._prepare_api_call(
1✔
322
            messages=messages,
323
            streaming_callback=streaming_callback,
324
            generation_kwargs=generation_kwargs,
325
            tools=tools,
326
            tools_strict=tools_strict,
327
        )
328
        openai_endpoint = api_args.pop("openai_endpoint")
1✔
329
        openai_endpoint_method = getattr(self.client.chat.completions, openai_endpoint)
1✔
330
        chat_completion = openai_endpoint_method(**api_args)
1✔
331

332
        if streaming_callback is not None:
1✔
333
            completions = self._handle_stream_response(
1✔
334
                # we cannot check isinstance(chat_completion, Stream) because some observability tools wrap Stream
335
                # and return a different type. See https://github.com/deepset-ai/haystack/issues/9014.
336
                chat_completion,  # type: ignore
337
                streaming_callback,
338
            )
339

340
        else:
341
            assert isinstance(chat_completion, ChatCompletion), "Unexpected response type for non-streaming request."
1✔
342
            completions = [
1✔
343
                _convert_chat_completion_to_chat_message(chat_completion, choice) for choice in chat_completion.choices
344
            ]
345

346
        # before returning, do post-processing of the completions
347
        for message in completions:
1✔
348
            _check_finish_reason(message.meta)
1✔
349

350
        return {"replies": completions}
1✔
351

352
    @component.output_types(replies=list[ChatMessage])
1✔
353
    async def run_async(
1✔
354
        self,
355
        messages: list[ChatMessage],
356
        streaming_callback: Optional[StreamingCallbackT] = None,
357
        generation_kwargs: Optional[dict[str, Any]] = None,
358
        *,
359
        tools: Optional[ToolsType] = None,
360
        tools_strict: Optional[bool] = None,
361
    ):
362
        """
363
        Asynchronously invokes chat completion based on the provided messages and generation parameters.
364

365
        This is the asynchronous version of the `run` method. It has the same parameters and return values
366
        but can be used with `await` in async code.
367

368
        :param messages:
369
            A list of ChatMessage instances representing the input messages.
370
        :param streaming_callback:
371
            A callback function that is called when a new token is received from the stream.
372
            Must be a coroutine.
373
        :param generation_kwargs:
374
            Additional keyword arguments for text generation. These parameters will
375
            override the parameters passed during component initialization.
376
            For details on OpenAI API parameters, see [OpenAI documentation](https://platform.openai.com/docs/api-reference/chat/create).
377
        :param tools: A list of Tool and/or Toolset objects, or a single Toolset for which the model can prepare calls.
378
            If set, it will override the `tools` parameter provided during initialization.
379
        :param tools_strict:
380
            Whether to enable strict schema adherence for tool calls. If set to `True`, the model will follow exactly
381
            the schema provided in the `parameters` field of the tool definition, but this may increase latency.
382
            If set, it will override the `tools_strict` parameter set during component initialization.
383

384
        :returns:
385
            A dictionary with the following key:
386
            - `replies`: A list containing the generated responses as ChatMessage instances.
387
        """
388
        # validate and select the streaming callback
389
        streaming_callback = select_streaming_callback(
1✔
390
            init_callback=self.streaming_callback, runtime_callback=streaming_callback, requires_async=True
391
        )
392
        chat_completion: Union[AsyncStream[ChatCompletionChunk], ChatCompletion, ParsedChatCompletion]
393

394
        if len(messages) == 0:
1✔
395
            return {"replies": []}
×
396

397
        api_args = self._prepare_api_call(
1✔
398
            messages=messages,
399
            streaming_callback=streaming_callback,
400
            generation_kwargs=generation_kwargs,
401
            tools=tools,
402
            tools_strict=tools_strict,
403
        )
404

405
        openai_endpoint = api_args.pop("openai_endpoint")
1✔
406
        openai_endpoint_method = getattr(self.async_client.chat.completions, openai_endpoint)
1✔
407
        chat_completion = await openai_endpoint_method(**api_args)
1✔
408

409
        if streaming_callback is not None:
1✔
410
            completions = await self._handle_async_stream_response(
1✔
411
                # we cannot check isinstance(chat_completion, AsyncStream) because some observability tools wrap
412
                # AsyncStream and return a different type. See https://github.com/deepset-ai/haystack/issues/9014.
413
                chat_completion,  # type: ignore
414
                streaming_callback,
415
            )
416

417
        else:
418
            assert isinstance(chat_completion, ChatCompletion), "Unexpected response type for non-streaming request."
1✔
419
            completions = [
1✔
420
                _convert_chat_completion_to_chat_message(chat_completion, choice) for choice in chat_completion.choices
421
            ]
422

423
        # before returning, do post-processing of the completions
424
        for message in completions:
1✔
425
            _check_finish_reason(message.meta)
1✔
426

427
        return {"replies": completions}
1✔
428

429
    def _prepare_api_call(  # noqa: PLR0913
1✔
430
        self,
431
        *,
432
        messages: list[ChatMessage],
433
        streaming_callback: Optional[StreamingCallbackT] = None,
434
        generation_kwargs: Optional[dict[str, Any]] = None,
435
        tools: Optional[ToolsType] = None,
436
        tools_strict: Optional[bool] = None,
437
    ) -> dict[str, Any]:
438
        # update generation kwargs by merging with the generation kwargs passed to the run method
439
        generation_kwargs = {**self.generation_kwargs, **(generation_kwargs or {})}
1✔
440

441
        is_streaming = streaming_callback is not None
1✔
442
        num_responses = generation_kwargs.pop("n", 1)
1✔
443

444
        if is_streaming and num_responses > 1:
1✔
445
            raise ValueError("Cannot stream multiple responses, please set n=1.")
×
446
        response_format = generation_kwargs.pop("response_format", None)
1✔
447

448
        # adapt ChatMessage(s) to the format expected by the OpenAI API
449
        openai_formatted_messages = [message.to_openai_dict_format() for message in messages]
1✔
450

451
        flattened_tools = flatten_tools_or_toolsets(tools or self.tools)
1✔
452
        tools_strict = tools_strict if tools_strict is not None else self.tools_strict
1✔
453
        _check_duplicate_tool_names(flattened_tools)
1✔
454

455
        openai_tools = {}
1✔
456
        if flattened_tools:
1✔
457
            tool_definitions = []
1✔
458
            for t in flattened_tools:
1✔
459
                function_spec = {**t.tool_spec}
1✔
460
                if tools_strict:
1✔
461
                    function_spec["strict"] = True
1✔
462
                    function_spec["parameters"]["additionalProperties"] = False
1✔
463
                tool_definitions.append({"type": "function", "function": function_spec})
1✔
464
            openai_tools = {"tools": tool_definitions}
1✔
465

466
        base_args = {
1✔
467
            "model": self.model,
468
            "messages": openai_formatted_messages,
469
            "n": num_responses,
470
            **openai_tools,
471
            **generation_kwargs,
472
        }
473

474
        if response_format and not is_streaming:
1✔
475
            # for structured outputs without streaming, we use openai's parse endpoint
476
            # Note: `stream` cannot be passed to chat.completions.parse
477
            # we pass a key `openai_endpoint` as a hint to the run method to use the parse endpoint
478
            # this key will be removed before the API call is made
479
            return {**base_args, "response_format": response_format, "openai_endpoint": "parse"}
1✔
480

481
        # for structured outputs with streaming, we use openai's create endpoint
482
        # we pass a key `openai_endpoint` as a hint to the run method to use the create endpoint
483
        # this key will be removed before the API call is made
484
        final_args = {**base_args, "stream": is_streaming, "openai_endpoint": "create"}
1✔
485

486
        # We only set the response_format parameter if it's not None since None is not a valid value in the API.
487
        if response_format:
1✔
488
            final_args["response_format"] = response_format
1✔
489
        return final_args
1✔
490

491
    def _handle_stream_response(self, chat_completion: Stream, callback: SyncStreamingCallbackT) -> list[ChatMessage]:
1✔
492
        component_info = ComponentInfo.from_component(self)
1✔
493
        chunks: list[StreamingChunk] = []
1✔
494
        for chunk in chat_completion:  # pylint: disable=not-an-iterable
1✔
495
            assert len(chunk.choices) <= 1, "Streaming responses should have at most one choice."
1✔
496
            chunk_delta = _convert_chat_completion_chunk_to_streaming_chunk(
1✔
497
                chunk=chunk, previous_chunks=chunks, component_info=component_info
498
            )
499
            chunks.append(chunk_delta)
1✔
500
            callback(chunk_delta)
1✔
501
        return [_convert_streaming_chunks_to_chat_message(chunks=chunks)]
1✔
502

503
    async def _handle_async_stream_response(
1✔
504
        self, chat_completion: AsyncStream, callback: AsyncStreamingCallbackT
505
    ) -> list[ChatMessage]:
506
        component_info = ComponentInfo.from_component(self)
1✔
507
        chunks: list[StreamingChunk] = []
1✔
508
        async for chunk in chat_completion:  # pylint: disable=not-an-iterable
1✔
509
            assert len(chunk.choices) <= 1, "Streaming responses should have at most one choice."
1✔
510
            chunk_delta = _convert_chat_completion_chunk_to_streaming_chunk(
1✔
511
                chunk=chunk, previous_chunks=chunks, component_info=component_info
512
            )
513
            chunks.append(chunk_delta)
1✔
514
            await callback(chunk_delta)
1✔
515
        return [_convert_streaming_chunks_to_chat_message(chunks=chunks)]
1✔
516

517

518
def _check_finish_reason(meta: dict[str, Any]) -> None:
1✔
519
    if meta["finish_reason"] == "length":
1✔
520
        logger.warning(
1✔
521
            "The completion for index {index} has been truncated before reaching a natural stopping point. "
522
            "Increase the max_completion_tokens parameter to allow for longer completions.",
523
            index=meta["index"],
524
            finish_reason=meta["finish_reason"],
525
        )
526
    if meta["finish_reason"] == "content_filter":
1✔
527
        logger.warning(
1✔
528
            "The completion for index {index} has been truncated due to the content filter.",
529
            index=meta["index"],
530
            finish_reason=meta["finish_reason"],
531
        )
532

533

534
def _convert_chat_completion_to_chat_message(
1✔
535
    completion: Union[ChatCompletion, ParsedChatCompletion], choice: Choice
536
) -> ChatMessage:
537
    """
538
    Converts the non-streaming response from the OpenAI API to a ChatMessage.
539

540
    :param completion: The completion returned by the OpenAI API.
541
    :param choice: The choice returned by the OpenAI API.
542
    :return: The ChatMessage.
543
    """
544
    message: Union[ChatCompletionMessage, ParsedChatCompletionMessage] = choice.message
1✔
545
    text = message.content
1✔
546
    tool_calls = []
1✔
547
    if message.tool_calls:
1✔
548
        # we currently only support function tools (not custom tools)
549
        # https://platform.openai.com/docs/guides/function-calling#custom-tools
550
        openai_tool_calls = [tc for tc in message.tool_calls if not isinstance(tc, ChatCompletionMessageCustomToolCall)]
1✔
551
        for openai_tc in openai_tool_calls:
1✔
552
            arguments_str = openai_tc.function.arguments
1✔
553
            try:
1✔
554
                arguments = json.loads(arguments_str)
1✔
555
                tool_calls.append(ToolCall(id=openai_tc.id, tool_name=openai_tc.function.name, arguments=arguments))
1✔
556
            except json.JSONDecodeError:
1✔
557
                logger.warning(
1✔
558
                    "OpenAI returned a malformed JSON string for tool call arguments. This tool call "
559
                    "will be skipped. To always generate a valid JSON, set `tools_strict` to `True`. "
560
                    "Tool call ID: {_id}, Tool name: {_name}, Arguments: {_arguments}",
561
                    _id=openai_tc.id,
562
                    _name=openai_tc.function.name,
563
                    _arguments=arguments_str,
564
                )
565

566
    chat_message = ChatMessage.from_assistant(
1✔
567
        text=text,
568
        tool_calls=tool_calls,
569
        meta={
570
            "model": completion.model,
571
            "index": choice.index,
572
            "finish_reason": choice.finish_reason,
573
            "usage": _serialize_usage(completion.usage),
574
        },
575
    )
576

577
    return chat_message
1✔
578

579

580
def _convert_chat_completion_chunk_to_streaming_chunk(
1✔
581
    chunk: ChatCompletionChunk, previous_chunks: list[StreamingChunk], component_info: Optional[ComponentInfo] = None
582
) -> StreamingChunk:
583
    """
584
    Converts the streaming response chunk from the OpenAI API to a StreamingChunk.
585

586
    :param chunk: The chunk returned by the OpenAI API.
587
    :param previous_chunks: A list of previously received StreamingChunks.
588
    :param component_info: An optional `ComponentInfo` object containing information about the component that
589
        generated the chunk, such as the component name and type.
590

591
    :returns:
592
        A StreamingChunk object representing the content of the chunk from the OpenAI API.
593
    """
594
    finish_reason_mapping: dict[str, FinishReason] = {
1✔
595
        "stop": "stop",
596
        "length": "length",
597
        "content_filter": "content_filter",
598
        "tool_calls": "tool_calls",
599
        "function_call": "tool_calls",
600
    }
601
    # On very first chunk so len(previous_chunks) == 0, the Choices field only provides role info (e.g. "assistant")
602
    # Choices is empty if include_usage is set to True where the usage information is returned.
603
    if len(chunk.choices) == 0:
1✔
604
        return StreamingChunk(
1✔
605
            content="",
606
            component_info=component_info,
607
            # Index is None since it's only set to an int when a content block is present
608
            index=None,
609
            finish_reason=None,
610
            meta={
611
                "model": chunk.model,
612
                "received_at": datetime.now().isoformat(),
613
                "usage": _serialize_usage(chunk.usage),
614
            },
615
        )
616

617
    choice: ChunkChoice = chunk.choices[0]
1✔
618

619
    # create a list of ToolCallDelta objects from the tool calls
620
    if choice.delta.tool_calls:
1✔
621
        tool_calls_deltas = []
1✔
622
        for tool_call in choice.delta.tool_calls:
1✔
623
            function = tool_call.function
1✔
624
            tool_calls_deltas.append(
1✔
625
                ToolCallDelta(
626
                    index=tool_call.index,
627
                    id=tool_call.id,
628
                    tool_name=function.name if function else None,
629
                    arguments=function.arguments if function and function.arguments else None,
630
                )
631
            )
632
        chunk_message = StreamingChunk(
1✔
633
            content=choice.delta.content or "",
634
            component_info=component_info,
635
            # We adopt the first tool_calls_deltas.index as the overall index of the chunk.
636
            index=tool_calls_deltas[0].index,
637
            tool_calls=tool_calls_deltas,
638
            start=tool_calls_deltas[0].tool_name is not None,
639
            finish_reason=finish_reason_mapping.get(choice.finish_reason) if choice.finish_reason else None,
640
            meta={
641
                "model": chunk.model,
642
                "index": choice.index,
643
                "tool_calls": choice.delta.tool_calls,
644
                "finish_reason": choice.finish_reason,
645
                "received_at": datetime.now().isoformat(),
646
                "usage": _serialize_usage(chunk.usage),
647
            },
648
        )
649
        return chunk_message
1✔
650

651
    # On very first chunk the choice field only provides role info (e.g. "assistant") so we set index to None
652
    # We set all chunks missing the content field to index of None. E.g. can happen if chunk only contains finish
653
    # reason.
654
    if choice.delta.content is None or choice.delta.role is not None:
1✔
655
        resolved_index = None
1✔
656
    else:
657
        # We set the index to be 0 since if text content is being streamed then no tool calls are being streamed
658
        # NOTE: We may need to revisit this if OpenAI allows planning/thinking content before tool calls like
659
        #       Anthropic Claude
660
        resolved_index = 0
1✔
661
    chunk_message = StreamingChunk(
1✔
662
        content=choice.delta.content or "",
663
        component_info=component_info,
664
        index=resolved_index,
665
        # The first chunk is always a start message chunk that only contains role information, so if we reach here
666
        # and previous_chunks is length 1 then this is the start of text content.
667
        start=len(previous_chunks) == 1,
668
        finish_reason=finish_reason_mapping.get(choice.finish_reason) if choice.finish_reason else None,
669
        meta={
670
            "model": chunk.model,
671
            "index": choice.index,
672
            "tool_calls": choice.delta.tool_calls,
673
            "finish_reason": choice.finish_reason,
674
            "received_at": datetime.now().isoformat(),
675
            "usage": _serialize_usage(chunk.usage),
676
        },
677
    )
678
    return chunk_message
1✔
679

680

681
def _serialize_usage(usage):
1✔
682
    """Convert OpenAI usage object to serializable dict recursively"""
683
    if hasattr(usage, "model_dump"):
1✔
684
        return usage.model_dump()
1✔
685
    elif hasattr(usage, "__dict__"):
1✔
686
        return {k: _serialize_usage(v) for k, v in usage.__dict__.items() if not k.startswith("_")}
×
687
    elif isinstance(usage, dict):
1✔
688
        return {k: _serialize_usage(v) for k, v in usage.items()}
×
689
    elif isinstance(usage, list):
1✔
690
        return [_serialize_usage(item) for item in usage]
×
691
    else:
692
        return usage
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc