• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

deepset-ai / haystack / 15165238383

21 May 2025 02:42PM UTC coverage: 90.404% (-0.04%) from 90.443%
15165238383

Pull #9275

github

web-flow
Merge 82e69fe2c into 17432f710
Pull Request #9275: feat: return common type in SuperComponent type compatibility check

11135 of 12317 relevant lines covered (90.4%)

0.9 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.11
haystack/components/generators/chat/openai.py
1
# SPDX-FileCopyrightText: 2022-present deepset GmbH <info@deepset.ai>
2
#
3
# SPDX-License-Identifier: Apache-2.0
4

5
import json
1✔
6
import os
1✔
7
from datetime import datetime
1✔
8
from typing import Any, Dict, List, Optional, Union
1✔
9

10
from openai import AsyncOpenAI, AsyncStream, OpenAI, Stream
1✔
11
from openai.types.chat import ChatCompletion, ChatCompletionChunk, ChatCompletionMessage
1✔
12
from openai.types.chat.chat_completion import Choice
1✔
13
from openai.types.chat.chat_completion_chunk import Choice as ChunkChoice
1✔
14

15
from haystack import component, default_from_dict, default_to_dict, logging
1✔
16
from haystack.dataclasses import (
1✔
17
    AsyncStreamingCallbackT,
18
    ChatMessage,
19
    StreamingCallbackT,
20
    StreamingChunk,
21
    SyncStreamingCallbackT,
22
    ToolCall,
23
    select_streaming_callback,
24
)
25
from haystack.tools import (
1✔
26
    Tool,
27
    Toolset,
28
    _check_duplicate_tool_names,
29
    deserialize_tools_or_toolset_inplace,
30
    serialize_tools_or_toolset,
31
)
32
from haystack.utils import Secret, deserialize_callable, deserialize_secrets_inplace, serialize_callable
1✔
33
from haystack.utils.http_client import init_http_client
1✔
34

35
logger = logging.getLogger(__name__)
1✔
36

37

38
@component
1✔
39
class OpenAIChatGenerator:
1✔
40
    """
41
    Completes chats using OpenAI's large language models (LLMs).
42

43
    It works with the gpt-4 and o-series models and supports streaming responses
44
    from OpenAI API. It uses [ChatMessage](https://docs.haystack.deepset.ai/docs/chatmessage)
45
    format in input and output.
46

47
    You can customize how the text is generated by passing parameters to the
48
    OpenAI API. Use the `**generation_kwargs` argument when you initialize
49
    the component or when you run it. Any parameter that works with
50
    `openai.ChatCompletion.create` will work here too.
51

52
    For details on OpenAI API parameters, see
53
    [OpenAI documentation](https://platform.openai.com/docs/api-reference/chat).
54

55
    ### Usage example
56

57
    ```python
58
    from haystack.components.generators.chat import OpenAIChatGenerator
59
    from haystack.dataclasses import ChatMessage
60

61
    messages = [ChatMessage.from_user("What's Natural Language Processing?")]
62

63
    client = OpenAIChatGenerator()
64
    response = client.run(messages)
65
    print(response)
66
    ```
67
    Output:
68
    ```
69
    {'replies':
70
        [ChatMessage(_role=<ChatRole.ASSISTANT: 'assistant'>, _content=
71
        [TextContent(text="Natural Language Processing (NLP) is a branch of artificial intelligence
72
            that focuses on enabling computers to understand, interpret, and generate human language in
73
            a way that is meaningful and useful.")],
74
         _name=None,
75
         _meta={'model': 'gpt-4o-mini', 'index': 0, 'finish_reason': 'stop',
76
         'usage': {'prompt_tokens': 15, 'completion_tokens': 36, 'total_tokens': 51}})
77
        ]
78
    }
79
    ```
80
    """
81

82
    def __init__(  # pylint: disable=too-many-positional-arguments
1✔
83
        self,
84
        api_key: Secret = Secret.from_env_var("OPENAI_API_KEY"),
85
        model: str = "gpt-4o-mini",
86
        streaming_callback: Optional[StreamingCallbackT] = None,
87
        api_base_url: Optional[str] = None,
88
        organization: Optional[str] = None,
89
        generation_kwargs: Optional[Dict[str, Any]] = None,
90
        timeout: Optional[float] = None,
91
        max_retries: Optional[int] = None,
92
        tools: Optional[Union[List[Tool], Toolset]] = None,
93
        tools_strict: bool = False,
94
        http_client_kwargs: Optional[Dict[str, Any]] = None,
95
    ):
96
        """
97
        Creates an instance of OpenAIChatGenerator. Unless specified otherwise in `model`, uses OpenAI's gpt-4o-mini
98

99
        Before initializing the component, you can set the 'OPENAI_TIMEOUT' and 'OPENAI_MAX_RETRIES'
100
        environment variables to override the `timeout` and `max_retries` parameters respectively
101
        in the OpenAI client.
102

103
        :param api_key: The OpenAI API key.
104
            You can set it with an environment variable `OPENAI_API_KEY`, or pass with this parameter
105
            during initialization.
106
        :param model: The name of the model to use.
107
        :param streaming_callback: A callback function that is called when a new token is received from the stream.
108
            The callback function accepts [StreamingChunk](https://docs.haystack.deepset.ai/docs/data-classes#streamingchunk)
109
            as an argument.
110
        :param api_base_url: An optional base URL.
111
        :param organization: Your organization ID, defaults to `None`. See
112
        [production best practices](https://platform.openai.com/docs/guides/production-best-practices/setting-up-your-organization).
113
        :param generation_kwargs: Other parameters to use for the model. These parameters are sent directly to
114
            the OpenAI endpoint. See OpenAI [documentation](https://platform.openai.com/docs/api-reference/chat) for
115
            more details.
116
            Some of the supported parameters:
117
            - `max_tokens`: The maximum number of tokens the output text can have.
118
            - `temperature`: What sampling temperature to use. Higher values mean the model will take more risks.
119
                Try 0.9 for more creative applications and 0 (argmax sampling) for ones with a well-defined answer.
120
            - `top_p`: An alternative to sampling with temperature, called nucleus sampling, where the model
121
                considers the results of the tokens with top_p probability mass. For example, 0.1 means only the tokens
122
                comprising the top 10% probability mass are considered.
123
            - `n`: How many completions to generate for each prompt. For example, if the LLM gets 3 prompts and n is 2,
124
                it will generate two completions for each of the three prompts, ending up with 6 completions in total.
125
            - `stop`: One or more sequences after which the LLM should stop generating tokens.
126
            - `presence_penalty`: What penalty to apply if a token is already present at all. Bigger values mean
127
                the model will be less likely to repeat the same token in the text.
128
            - `frequency_penalty`: What penalty to apply if a token has already been generated in the text.
129
                Bigger values mean the model will be less likely to repeat the same token in the text.
130
            - `logit_bias`: Add a logit bias to specific tokens. The keys of the dictionary are tokens, and the
131
                values are the bias to add to that token.
132
        :param timeout:
133
            Timeout for OpenAI client calls. If not set, it defaults to either the
134
            `OPENAI_TIMEOUT` environment variable, or 30 seconds.
135
        :param max_retries:
136
            Maximum number of retries to contact OpenAI after an internal error.
137
            If not set, it defaults to either the `OPENAI_MAX_RETRIES` environment variable, or set to 5.
138
        :param tools:
139
            A list of tools or a Toolset for which the model can prepare calls. This parameter can accept either a
140
            list of `Tool` objects or a `Toolset` instance.
141
        :param tools_strict:
142
            Whether to enable strict schema adherence for tool calls. If set to `True`, the model will follow exactly
143
            the schema provided in the `parameters` field of the tool definition, but this may increase latency.
144
        :param http_client_kwargs:
145
            A dictionary of keyword arguments to configure a custom `httpx.Client`or `httpx.AsyncClient`.
146
            For more information, see the [HTTPX documentation](https://www.python-httpx.org/api/#client).
147
        """
148
        self.api_key = api_key
1✔
149
        self.model = model
1✔
150
        self.generation_kwargs = generation_kwargs or {}
1✔
151
        self.streaming_callback = streaming_callback
1✔
152
        self.api_base_url = api_base_url
1✔
153
        self.organization = organization
1✔
154
        self.timeout = timeout
1✔
155
        self.max_retries = max_retries
1✔
156
        self.tools = tools  # Store tools as-is, whether it's a list or a Toolset
1✔
157
        self.tools_strict = tools_strict
1✔
158
        self.http_client_kwargs = http_client_kwargs
1✔
159
        # Check for duplicate tool names
160
        _check_duplicate_tool_names(list(self.tools or []))
1✔
161

162
        if timeout is None:
1✔
163
            timeout = float(os.environ.get("OPENAI_TIMEOUT", "30.0"))
1✔
164
        if max_retries is None:
1✔
165
            max_retries = int(os.environ.get("OPENAI_MAX_RETRIES", "5"))
1✔
166

167
        client_kwargs: Dict[str, Any] = {
1✔
168
            "api_key": api_key.resolve_value(),
169
            "organization": organization,
170
            "base_url": api_base_url,
171
            "timeout": timeout,
172
            "max_retries": max_retries,
173
        }
174

175
        self.client = OpenAI(http_client=init_http_client(self.http_client_kwargs, async_client=False), **client_kwargs)
1✔
176
        self.async_client = AsyncOpenAI(
1✔
177
            http_client=init_http_client(self.http_client_kwargs, async_client=True), **client_kwargs
178
        )
179

180
    def _get_telemetry_data(self) -> Dict[str, Any]:
1✔
181
        """
182
        Data that is sent to Posthog for usage analytics.
183
        """
184
        return {"model": self.model}
1✔
185

186
    def to_dict(self) -> Dict[str, Any]:
1✔
187
        """
188
        Serialize this component to a dictionary.
189

190
        :returns:
191
            The serialized component as a dictionary.
192
        """
193
        callback_name = serialize_callable(self.streaming_callback) if self.streaming_callback else None
1✔
194
        return default_to_dict(
1✔
195
            self,
196
            model=self.model,
197
            streaming_callback=callback_name,
198
            api_base_url=self.api_base_url,
199
            organization=self.organization,
200
            generation_kwargs=self.generation_kwargs,
201
            api_key=self.api_key.to_dict(),
202
            timeout=self.timeout,
203
            max_retries=self.max_retries,
204
            tools=serialize_tools_or_toolset(self.tools),
205
            tools_strict=self.tools_strict,
206
            http_client_kwargs=self.http_client_kwargs,
207
        )
208

209
    @classmethod
1✔
210
    def from_dict(cls, data: Dict[str, Any]) -> "OpenAIChatGenerator":
1✔
211
        """
212
        Deserialize this component from a dictionary.
213

214
        :param data: The dictionary representation of this component.
215
        :returns:
216
            The deserialized component instance.
217
        """
218
        deserialize_secrets_inplace(data["init_parameters"], keys=["api_key"])
1✔
219
        deserialize_tools_or_toolset_inplace(data["init_parameters"], key="tools")
1✔
220
        init_params = data.get("init_parameters", {})
1✔
221
        serialized_callback_handler = init_params.get("streaming_callback")
1✔
222
        if serialized_callback_handler:
1✔
223
            data["init_parameters"]["streaming_callback"] = deserialize_callable(serialized_callback_handler)
1✔
224
        return default_from_dict(cls, data)
1✔
225

226
    @component.output_types(replies=List[ChatMessage])
1✔
227
    def run(
1✔
228
        self,
229
        messages: List[ChatMessage],
230
        streaming_callback: Optional[StreamingCallbackT] = None,
231
        generation_kwargs: Optional[Dict[str, Any]] = None,
232
        *,
233
        tools: Optional[Union[List[Tool], Toolset]] = None,
234
        tools_strict: Optional[bool] = None,
235
    ):
236
        """
237
        Invokes chat completion based on the provided messages and generation parameters.
238

239
        :param messages:
240
            A list of ChatMessage instances representing the input messages.
241
        :param streaming_callback:
242
            A callback function that is called when a new token is received from the stream.
243
        :param generation_kwargs:
244
            Additional keyword arguments for text generation. These parameters will
245
            override the parameters passed during component initialization.
246
            For details on OpenAI API parameters, see [OpenAI documentation](https://platform.openai.com/docs/api-reference/chat/create).
247
        :param tools:
248
            A list of tools or a Toolset for which the model can prepare calls. If set, it will override the
249
            `tools` parameter set during component initialization. This parameter can accept either a list of
250
            `Tool` objects or a `Toolset` instance.
251
        :param tools_strict:
252
            Whether to enable strict schema adherence for tool calls. If set to `True`, the model will follow exactly
253
            the schema provided in the `parameters` field of the tool definition, but this may increase latency.
254
            If set, it will override the `tools_strict` parameter set during component initialization.
255

256
        :returns:
257
            A dictionary with the following key:
258
            - `replies`: A list containing the generated responses as ChatMessage instances.
259
        """
260
        if len(messages) == 0:
1✔
261
            return {"replies": []}
×
262

263
        streaming_callback = select_streaming_callback(
1✔
264
            init_callback=self.streaming_callback, runtime_callback=streaming_callback, requires_async=False
265
        )
266

267
        api_args = self._prepare_api_call(
1✔
268
            messages=messages,
269
            streaming_callback=streaming_callback,
270
            generation_kwargs=generation_kwargs,
271
            tools=tools,
272
            tools_strict=tools_strict,
273
        )
274
        chat_completion: Union[Stream[ChatCompletionChunk], ChatCompletion] = self.client.chat.completions.create(
1✔
275
            **api_args
276
        )
277

278
        if streaming_callback is not None:
1✔
279
            completions = self._handle_stream_response(
1✔
280
                chat_completion,  # type: ignore
281
                streaming_callback,  # type: ignore
282
            )
283

284
        else:
285
            assert isinstance(chat_completion, ChatCompletion), "Unexpected response type for non-streaming request."
1✔
286
            completions = [
1✔
287
                self._convert_chat_completion_to_chat_message(chat_completion, choice)
288
                for choice in chat_completion.choices
289
            ]
290

291
        # before returning, do post-processing of the completions
292
        for message in completions:
1✔
293
            self._check_finish_reason(message.meta)
1✔
294

295
        return {"replies": completions}
1✔
296

297
    @component.output_types(replies=List[ChatMessage])
1✔
298
    async def run_async(
1✔
299
        self,
300
        messages: List[ChatMessage],
301
        streaming_callback: Optional[StreamingCallbackT] = None,
302
        generation_kwargs: Optional[Dict[str, Any]] = None,
303
        *,
304
        tools: Optional[Union[List[Tool], Toolset]] = None,
305
        tools_strict: Optional[bool] = None,
306
    ):
307
        """
308
        Asynchronously invokes chat completion based on the provided messages and generation parameters.
309

310
        This is the asynchronous version of the `run` method. It has the same parameters and return values
311
        but can be used with `await` in async code.
312

313
        :param messages:
314
            A list of ChatMessage instances representing the input messages.
315
        :param streaming_callback:
316
            A callback function that is called when a new token is received from the stream.
317
            Must be a coroutine.
318
        :param generation_kwargs:
319
            Additional keyword arguments for text generation. These parameters will
320
            override the parameters passed during component initialization.
321
            For details on OpenAI API parameters, see [OpenAI documentation](https://platform.openai.com/docs/api-reference/chat/create).
322
        :param tools:
323
            A list of tools or a Toolset for which the model can prepare calls. If set, it will override the
324
            `tools` parameter set during component initialization. This parameter can accept either a list of
325
            `Tool` objects or a `Toolset` instance.
326
        :param tools_strict:
327
            Whether to enable strict schema adherence for tool calls. If set to `True`, the model will follow exactly
328
            the schema provided in the `parameters` field of the tool definition, but this may increase latency.
329
            If set, it will override the `tools_strict` parameter set during component initialization.
330

331
        :returns:
332
            A dictionary with the following key:
333
            - `replies`: A list containing the generated responses as ChatMessage instances.
334
        """
335
        # validate and select the streaming callback
336
        streaming_callback = select_streaming_callback(
1✔
337
            init_callback=self.streaming_callback, runtime_callback=streaming_callback, requires_async=True
338
        )
339

340
        if len(messages) == 0:
1✔
341
            return {"replies": []}
×
342

343
        api_args = self._prepare_api_call(
1✔
344
            messages=messages,
345
            streaming_callback=streaming_callback,
346
            generation_kwargs=generation_kwargs,
347
            tools=tools,
348
            tools_strict=tools_strict,
349
        )
350

351
        chat_completion: Union[
1✔
352
            AsyncStream[ChatCompletionChunk], ChatCompletion
353
        ] = await self.async_client.chat.completions.create(**api_args)
354

355
        if streaming_callback is not None:
1✔
356
            completions = await self._handle_async_stream_response(
1✔
357
                chat_completion,  # type: ignore
358
                streaming_callback,  # type: ignore
359
            )
360

361
        else:
362
            assert isinstance(chat_completion, ChatCompletion), "Unexpected response type for non-streaming request."
1✔
363
            completions = [
1✔
364
                self._convert_chat_completion_to_chat_message(chat_completion, choice)
365
                for choice in chat_completion.choices
366
            ]
367

368
        # before returning, do post-processing of the completions
369
        for message in completions:
1✔
370
            self._check_finish_reason(message.meta)
1✔
371

372
        return {"replies": completions}
1✔
373

374
    def _prepare_api_call(  # noqa: PLR0913
1✔
375
        self,
376
        *,
377
        messages: List[ChatMessage],
378
        streaming_callback: Optional[StreamingCallbackT] = None,
379
        generation_kwargs: Optional[Dict[str, Any]] = None,
380
        tools: Optional[Union[List[Tool], Toolset]] = None,
381
        tools_strict: Optional[bool] = None,
382
    ) -> Dict[str, Any]:
383
        # update generation kwargs by merging with the generation kwargs passed to the run method
384
        generation_kwargs = {**self.generation_kwargs, **(generation_kwargs or {})}
1✔
385

386
        # adapt ChatMessage(s) to the format expected by the OpenAI API
387
        openai_formatted_messages = [message.to_openai_dict_format() for message in messages]
1✔
388

389
        tools = tools or self.tools
1✔
390
        if isinstance(tools, Toolset):
1✔
391
            tools = list(tools)
×
392
        tools_strict = tools_strict if tools_strict is not None else self.tools_strict
1✔
393
        _check_duplicate_tool_names(tools)
1✔
394

395
        openai_tools = {}
1✔
396
        if tools:
1✔
397
            tool_definitions = []
1✔
398
            for t in tools:
1✔
399
                function_spec = {**t.tool_spec}
1✔
400
                if tools_strict:
1✔
401
                    function_spec["strict"] = True
1✔
402
                    function_spec["parameters"]["additionalProperties"] = False
1✔
403
                tool_definitions.append({"type": "function", "function": function_spec})
1✔
404
            openai_tools = {"tools": tool_definitions}
1✔
405

406
        is_streaming = streaming_callback is not None
1✔
407
        num_responses = generation_kwargs.pop("n", 1)
1✔
408
        if is_streaming and num_responses > 1:
1✔
409
            raise ValueError("Cannot stream multiple responses, please set n=1.")
×
410

411
        return {
1✔
412
            "model": self.model,
413
            "messages": openai_formatted_messages,  # type: ignore[arg-type] # openai expects list of specific message types
414
            "stream": streaming_callback is not None,
415
            "n": num_responses,
416
            **openai_tools,
417
            **generation_kwargs,
418
        }
419

420
    def _handle_stream_response(self, chat_completion: Stream, callback: SyncStreamingCallbackT) -> List[ChatMessage]:
1✔
421
        chunks: List[StreamingChunk] = []
1✔
422
        chunk = None
1✔
423
        chunk_delta: StreamingChunk
424

425
        for chunk in chat_completion:  # pylint: disable=not-an-iterable
1✔
426
            assert len(chunk.choices) <= 1, "Streaming responses should have at most one choice."
1✔
427
            chunk_delta = self._convert_chat_completion_chunk_to_streaming_chunk(chunk)
1✔
428
            chunks.append(chunk_delta)
1✔
429
            callback(chunk_delta)
1✔
430
        return [self._convert_streaming_chunks_to_chat_message(chunk, chunks)]
1✔
431

432
    async def _handle_async_stream_response(
1✔
433
        self, chat_completion: AsyncStream, callback: AsyncStreamingCallbackT
434
    ) -> List[ChatMessage]:
435
        chunks: List[StreamingChunk] = []
1✔
436
        chunk = None
1✔
437
        chunk_delta: StreamingChunk
438

439
        async for chunk in chat_completion:  # pylint: disable=not-an-iterable
1✔
440
            assert len(chunk.choices) <= 1, "Streaming responses should have at most one choice."
1✔
441
            chunk_delta = self._convert_chat_completion_chunk_to_streaming_chunk(chunk)
1✔
442
            chunks.append(chunk_delta)
1✔
443
            await callback(chunk_delta)
1✔
444
        return [self._convert_streaming_chunks_to_chat_message(chunk, chunks)]
1✔
445

446
    def _check_finish_reason(self, meta: Dict[str, Any]) -> None:
1✔
447
        if meta["finish_reason"] == "length":
1✔
448
            logger.warning(
1✔
449
                "The completion for index {index} has been truncated before reaching a natural stopping point. "
450
                "Increase the max_tokens parameter to allow for longer completions.",
451
                index=meta["index"],
452
                finish_reason=meta["finish_reason"],
453
            )
454
        if meta["finish_reason"] == "content_filter":
1✔
455
            logger.warning(
1✔
456
                "The completion for index {index} has been truncated due to the content filter.",
457
                index=meta["index"],
458
                finish_reason=meta["finish_reason"],
459
            )
460

461
    def _convert_streaming_chunks_to_chat_message(
1✔
462
        self, last_chunk: ChatCompletionChunk, chunks: List[StreamingChunk]
463
    ) -> ChatMessage:
464
        """
465
        Connects the streaming chunks into a single ChatMessage.
466

467
        :param last_chunk: The last chunk returned by the OpenAI API.
468
        :param chunks: The list of all `StreamingChunk` objects.
469

470
        :returns: The ChatMessage.
471
        """
472
        text = "".join([chunk.content for chunk in chunks])
1✔
473
        tool_calls = []
1✔
474

475
        # Process tool calls if present in any chunk
476
        tool_call_data: Dict[str, Dict[str, str]] = {}  # Track tool calls by index
1✔
477
        for chunk_payload in chunks:
1✔
478
            tool_calls_meta = chunk_payload.meta.get("tool_calls")
1✔
479
            if tool_calls_meta is not None:
1✔
480
                for delta in tool_calls_meta:
1✔
481
                    # We use the index of the tool call to track it across chunks since the ID is not always provided
482
                    if delta.index not in tool_call_data:
1✔
483
                        tool_call_data[delta.index] = {"id": "", "name": "", "arguments": ""}
1✔
484

485
                    # Save the ID if present
486
                    if delta.id is not None:
1✔
487
                        tool_call_data[delta.index]["id"] = delta.id
1✔
488

489
                    if delta.function is not None:
1✔
490
                        if delta.function.name is not None:
1✔
491
                            tool_call_data[delta.index]["name"] += delta.function.name
1✔
492
                        if delta.function.arguments is not None:
1✔
493
                            tool_call_data[delta.index]["arguments"] += delta.function.arguments
1✔
494

495
        # Convert accumulated tool call data into ToolCall objects
496
        for call_data in tool_call_data.values():
1✔
497
            try:
1✔
498
                arguments = json.loads(call_data["arguments"])
1✔
499
                tool_calls.append(ToolCall(id=call_data["id"], tool_name=call_data["name"], arguments=arguments))
1✔
500
            except json.JSONDecodeError:
×
501
                logger.warning(
×
502
                    "OpenAI returned a malformed JSON string for tool call arguments. This tool call "
503
                    "will be skipped. To always generate a valid JSON, set `tools_strict` to `True`. "
504
                    "Tool call ID: {_id}, Tool name: {_name}, Arguments: {_arguments}",
505
                    _id=call_data["id"],
506
                    _name=call_data["name"],
507
                    _arguments=call_data["arguments"],
508
                )
509

510
        # finish_reason can appear in different places so we look for the last one
511
        finish_reasons = [
1✔
512
            chunk.meta.get("finish_reason") for chunk in chunks if chunk.meta.get("finish_reason") is not None
513
        ]
514
        finish_reason = finish_reasons[-1] if finish_reasons else None
1✔
515

516
        meta = {
1✔
517
            "model": last_chunk.model,
518
            "index": 0,
519
            "finish_reason": finish_reason,
520
            "completion_start_time": chunks[0].meta.get("received_at"),  # first chunk received
521
            "usage": self._serialize_usage(last_chunk.usage),  # last chunk has the final usage data if available
522
        }
523

524
        return ChatMessage.from_assistant(text=text or None, tool_calls=tool_calls, meta=meta)
1✔
525

526
    def _convert_chat_completion_to_chat_message(self, completion: ChatCompletion, choice: Choice) -> ChatMessage:
1✔
527
        """
528
        Converts the non-streaming response from the OpenAI API to a ChatMessage.
529

530
        :param completion: The completion returned by the OpenAI API.
531
        :param choice: The choice returned by the OpenAI API.
532
        :return: The ChatMessage.
533
        """
534
        message: ChatCompletionMessage = choice.message
1✔
535
        text = message.content
1✔
536
        tool_calls = []
1✔
537
        if openai_tool_calls := message.tool_calls:
1✔
538
            for openai_tc in openai_tool_calls:
1✔
539
                arguments_str = openai_tc.function.arguments
1✔
540
                try:
1✔
541
                    arguments = json.loads(arguments_str)
1✔
542
                    tool_calls.append(ToolCall(id=openai_tc.id, tool_name=openai_tc.function.name, arguments=arguments))
1✔
543
                except json.JSONDecodeError:
1✔
544
                    logger.warning(
1✔
545
                        "OpenAI returned a malformed JSON string for tool call arguments. This tool call "
546
                        "will be skipped. To always generate a valid JSON, set `tools_strict` to `True`. "
547
                        "Tool call ID: {_id}, Tool name: {_name}, Arguments: {_arguments}",
548
                        _id=openai_tc.id,
549
                        _name=openai_tc.function.name,
550
                        _arguments=arguments_str,
551
                    )
552

553
        chat_message = ChatMessage.from_assistant(text=text, tool_calls=tool_calls)
1✔
554
        chat_message._meta.update(
1✔
555
            {
556
                "model": completion.model,
557
                "index": choice.index,
558
                "finish_reason": choice.finish_reason,
559
                "usage": self._serialize_usage(completion.usage),
560
            }
561
        )
562
        return chat_message
1✔
563

564
    def _convert_chat_completion_chunk_to_streaming_chunk(self, chunk: ChatCompletionChunk) -> StreamingChunk:
1✔
565
        """
566
        Converts the streaming response chunk from the OpenAI API to a StreamingChunk.
567

568
        :param chunk: The chunk returned by the OpenAI API.
569

570
        :returns:
571
            The StreamingChunk.
572
        """
573
        # if there are no choices, return an empty chunk
574
        if len(chunk.choices) == 0:
1✔
575
            return StreamingChunk(content="", meta={"model": chunk.model, "received_at": datetime.now().isoformat()})
1✔
576

577
        # we stream the content of the chunk if it's not a tool or function call
578
        choice: ChunkChoice = chunk.choices[0]
1✔
579
        content = choice.delta.content or ""
1✔
580
        chunk_message = StreamingChunk(content)
1✔
581
        # but save the tool calls and function call in the meta if they are present
582
        # and then connect the chunks in the _convert_streaming_chunks_to_chat_message method
583
        chunk_message.meta.update(
1✔
584
            {
585
                "model": chunk.model,
586
                "index": choice.index,
587
                "tool_calls": choice.delta.tool_calls,
588
                "finish_reason": choice.finish_reason,
589
                "received_at": datetime.now().isoformat(),
590
            }
591
        )
592
        return chunk_message
1✔
593

594
    def _serialize_usage(self, usage):
1✔
595
        """Convert OpenAI usage object to serializable dict recursively"""
596
        if hasattr(usage, "model_dump"):
1✔
597
            return usage.model_dump()
1✔
598
        elif hasattr(usage, "__dict__"):
1✔
599
            return {k: self._serialize_usage(v) for k, v in usage.__dict__.items() if not k.startswith("_")}
×
600
        elif isinstance(usage, dict):
1✔
601
            return {k: self._serialize_usage(v) for k, v in usage.items()}
×
602
        elif isinstance(usage, list):
1✔
603
            return [self._serialize_usage(item) for item in usage]
×
604
        else:
605
            return usage
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc