• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

deepset-ai / haystack / 15191527043

22 May 2025 04:08PM UTC coverage: 90.345% (-0.07%) from 90.411%
15191527043

Pull #9426

github

web-flow
Merge 212e60881 into 4a5e4d3e6
Pull Request #9426: feat: add component name and type to `StreamingChunk`

11173 of 12367 relevant lines covered (90.35%)

0.9 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.26
haystack/components/generators/chat/openai.py
1
# SPDX-FileCopyrightText: 2022-present deepset GmbH <info@deepset.ai>
2
#
3
# SPDX-License-Identifier: Apache-2.0
4

5
import json
1✔
6
import os
1✔
7
from datetime import datetime
1✔
8
from typing import Any, Dict, List, Optional, Union
1✔
9

10
from openai import AsyncOpenAI, AsyncStream, OpenAI, Stream
1✔
11
from openai.types.chat import ChatCompletion, ChatCompletionChunk, ChatCompletionMessage
1✔
12
from openai.types.chat.chat_completion import Choice
1✔
13
from openai.types.chat.chat_completion_chunk import Choice as ChunkChoice
1✔
14

15
from haystack import component, default_from_dict, default_to_dict, logging
1✔
16
from haystack.dataclasses import (
1✔
17
    AsyncStreamingCallbackT,
18
    ChatMessage,
19
    ComponentInfo,
20
    StreamingCallbackT,
21
    StreamingChunk,
22
    SyncStreamingCallbackT,
23
    ToolCall,
24
    select_streaming_callback,
25
)
26
from haystack.tools import (
1✔
27
    Tool,
28
    Toolset,
29
    _check_duplicate_tool_names,
30
    deserialize_tools_or_toolset_inplace,
31
    serialize_tools_or_toolset,
32
)
33
from haystack.utils import Secret, deserialize_callable, deserialize_secrets_inplace, serialize_callable
1✔
34
from haystack.utils.http_client import init_http_client
1✔
35

36
logger = logging.getLogger(__name__)
1✔
37

38

39
@component
1✔
40
class OpenAIChatGenerator:
1✔
41
    """
42
    Completes chats using OpenAI's large language models (LLMs).
43

44
    It works with the gpt-4 and o-series models and supports streaming responses
45
    from OpenAI API. It uses [ChatMessage](https://docs.haystack.deepset.ai/docs/chatmessage)
46
    format in input and output.
47

48
    You can customize how the text is generated by passing parameters to the
49
    OpenAI API. Use the `**generation_kwargs` argument when you initialize
50
    the component or when you run it. Any parameter that works with
51
    `openai.ChatCompletion.create` will work here too.
52

53
    For details on OpenAI API parameters, see
54
    [OpenAI documentation](https://platform.openai.com/docs/api-reference/chat).
55

56
    ### Usage example
57

58
    ```python
59
    from haystack.components.generators.chat import OpenAIChatGenerator
60
    from haystack.dataclasses import ChatMessage
61

62
    messages = [ChatMessage.from_user("What's Natural Language Processing?")]
63

64
    client = OpenAIChatGenerator()
65
    response = client.run(messages)
66
    print(response)
67
    ```
68
    Output:
69
    ```
70
    {'replies':
71
        [ChatMessage(_role=<ChatRole.ASSISTANT: 'assistant'>, _content=
72
        [TextContent(text="Natural Language Processing (NLP) is a branch of artificial intelligence
73
            that focuses on enabling computers to understand, interpret, and generate human language in
74
            a way that is meaningful and useful.")],
75
         _name=None,
76
         _meta={'model': 'gpt-4o-mini', 'index': 0, 'finish_reason': 'stop',
77
         'usage': {'prompt_tokens': 15, 'completion_tokens': 36, 'total_tokens': 51}})
78
        ]
79
    }
80
    ```
81
    """
82

83
    # Type annotation for the component name
84
    __component_name__: str
1✔
85

86
    def __init__(  # pylint: disable=too-many-positional-arguments
1✔
87
        self,
88
        api_key: Secret = Secret.from_env_var("OPENAI_API_KEY"),
89
        model: str = "gpt-4o-mini",
90
        streaming_callback: Optional[StreamingCallbackT] = None,
91
        api_base_url: Optional[str] = None,
92
        organization: Optional[str] = None,
93
        generation_kwargs: Optional[Dict[str, Any]] = None,
94
        timeout: Optional[float] = None,
95
        max_retries: Optional[int] = None,
96
        tools: Optional[Union[List[Tool], Toolset]] = None,
97
        tools_strict: bool = False,
98
        http_client_kwargs: Optional[Dict[str, Any]] = None,
99
    ):
100
        """
101
        Creates an instance of OpenAIChatGenerator. Unless specified otherwise in `model`, uses OpenAI's gpt-4o-mini
102

103
        Before initializing the component, you can set the 'OPENAI_TIMEOUT' and 'OPENAI_MAX_RETRIES'
104
        environment variables to override the `timeout` and `max_retries` parameters respectively
105
        in the OpenAI client.
106

107
        :param api_key: The OpenAI API key.
108
            You can set it with an environment variable `OPENAI_API_KEY`, or pass with this parameter
109
            during initialization.
110
        :param model: The name of the model to use.
111
        :param streaming_callback: A callback function that is called when a new token is received from the stream.
112
            The callback function accepts [StreamingChunk](https://docs.haystack.deepset.ai/docs/data-classes#streamingchunk)
113
            as an argument.
114
        :param api_base_url: An optional base URL.
115
        :param organization: Your organization ID, defaults to `None`. See
116
        [production best practices](https://platform.openai.com/docs/guides/production-best-practices/setting-up-your-organization).
117
        :param generation_kwargs: Other parameters to use for the model. These parameters are sent directly to
118
            the OpenAI endpoint. See OpenAI [documentation](https://platform.openai.com/docs/api-reference/chat) for
119
            more details.
120
            Some of the supported parameters:
121
            - `max_tokens`: The maximum number of tokens the output text can have.
122
            - `temperature`: What sampling temperature to use. Higher values mean the model will take more risks.
123
                Try 0.9 for more creative applications and 0 (argmax sampling) for ones with a well-defined answer.
124
            - `top_p`: An alternative to sampling with temperature, called nucleus sampling, where the model
125
                considers the results of the tokens with top_p probability mass. For example, 0.1 means only the tokens
126
                comprising the top 10% probability mass are considered.
127
            - `n`: How many completions to generate for each prompt. For example, if the LLM gets 3 prompts and n is 2,
128
                it will generate two completions for each of the three prompts, ending up with 6 completions in total.
129
            - `stop`: One or more sequences after which the LLM should stop generating tokens.
130
            - `presence_penalty`: What penalty to apply if a token is already present at all. Bigger values mean
131
                the model will be less likely to repeat the same token in the text.
132
            - `frequency_penalty`: What penalty to apply if a token has already been generated in the text.
133
                Bigger values mean the model will be less likely to repeat the same token in the text.
134
            - `logit_bias`: Add a logit bias to specific tokens. The keys of the dictionary are tokens, and the
135
                values are the bias to add to that token.
136
        :param timeout:
137
            Timeout for OpenAI client calls. If not set, it defaults to either the
138
            `OPENAI_TIMEOUT` environment variable, or 30 seconds.
139
        :param max_retries:
140
            Maximum number of retries to contact OpenAI after an internal error.
141
            If not set, it defaults to either the `OPENAI_MAX_RETRIES` environment variable, or set to 5.
142
        :param tools:
143
            A list of tools or a Toolset for which the model can prepare calls. This parameter can accept either a
144
            list of `Tool` objects or a `Toolset` instance.
145
        :param tools_strict:
146
            Whether to enable strict schema adherence for tool calls. If set to `True`, the model will follow exactly
147
            the schema provided in the `parameters` field of the tool definition, but this may increase latency.
148
        :param http_client_kwargs:
149
            A dictionary of keyword arguments to configure a custom `httpx.Client`or `httpx.AsyncClient`.
150
            For more information, see the [HTTPX documentation](https://www.python-httpx.org/api/#client).
151
        """
152
        self.api_key = api_key
1✔
153
        self.model = model
1✔
154
        self.generation_kwargs = generation_kwargs or {}
1✔
155
        self.streaming_callback = streaming_callback
1✔
156
        self.api_base_url = api_base_url
1✔
157
        self.organization = organization
1✔
158
        self.timeout = timeout
1✔
159
        self.max_retries = max_retries
1✔
160
        self.tools = tools  # Store tools as-is, whether it's a list or a Toolset
1✔
161
        self.tools_strict = tools_strict
1✔
162
        self.http_client_kwargs = http_client_kwargs
1✔
163
        # Check for duplicate tool names
164
        _check_duplicate_tool_names(list(self.tools or []))
1✔
165

166
        if timeout is None:
1✔
167
            timeout = float(os.environ.get("OPENAI_TIMEOUT", "30.0"))
1✔
168
        if max_retries is None:
1✔
169
            max_retries = int(os.environ.get("OPENAI_MAX_RETRIES", "5"))
1✔
170

171
        client_kwargs: Dict[str, Any] = {
1✔
172
            "api_key": api_key.resolve_value(),
173
            "organization": organization,
174
            "base_url": api_base_url,
175
            "timeout": timeout,
176
            "max_retries": max_retries,
177
        }
178

179
        self.client = OpenAI(http_client=init_http_client(self.http_client_kwargs, async_client=False), **client_kwargs)
1✔
180
        self.async_client = AsyncOpenAI(
1✔
181
            http_client=init_http_client(self.http_client_kwargs, async_client=True), **client_kwargs
182
        )
183

184
    def _get_telemetry_data(self) -> Dict[str, Any]:
1✔
185
        """
186
        Data that is sent to Posthog for usage analytics.
187
        """
188
        return {"model": self.model}
1✔
189

190
    def to_dict(self) -> Dict[str, Any]:
1✔
191
        """
192
        Serialize this component to a dictionary.
193

194
        :returns:
195
            The serialized component as a dictionary.
196
        """
197
        callback_name = serialize_callable(self.streaming_callback) if self.streaming_callback else None
1✔
198
        return default_to_dict(
1✔
199
            self,
200
            model=self.model,
201
            streaming_callback=callback_name,
202
            api_base_url=self.api_base_url,
203
            organization=self.organization,
204
            generation_kwargs=self.generation_kwargs,
205
            api_key=self.api_key.to_dict(),
206
            timeout=self.timeout,
207
            max_retries=self.max_retries,
208
            tools=serialize_tools_or_toolset(self.tools),
209
            tools_strict=self.tools_strict,
210
            http_client_kwargs=self.http_client_kwargs,
211
        )
212

213
    @classmethod
1✔
214
    def from_dict(cls, data: Dict[str, Any]) -> "OpenAIChatGenerator":
1✔
215
        """
216
        Deserialize this component from a dictionary.
217

218
        :param data: The dictionary representation of this component.
219
        :returns:
220
            The deserialized component instance.
221
        """
222
        deserialize_secrets_inplace(data["init_parameters"], keys=["api_key"])
1✔
223
        deserialize_tools_or_toolset_inplace(data["init_parameters"], key="tools")
1✔
224
        init_params = data.get("init_parameters", {})
1✔
225
        serialized_callback_handler = init_params.get("streaming_callback")
1✔
226
        if serialized_callback_handler:
1✔
227
            data["init_parameters"]["streaming_callback"] = deserialize_callable(serialized_callback_handler)
1✔
228
        return default_from_dict(cls, data)
1✔
229

230
    @component.output_types(replies=List[ChatMessage])
1✔
231
    def run(
1✔
232
        self,
233
        messages: List[ChatMessage],
234
        streaming_callback: Optional[StreamingCallbackT] = None,
235
        generation_kwargs: Optional[Dict[str, Any]] = None,
236
        *,
237
        tools: Optional[Union[List[Tool], Toolset]] = None,
238
        tools_strict: Optional[bool] = None,
239
    ):
240
        """
241
        Invokes chat completion based on the provided messages and generation parameters.
242

243
        :param messages:
244
            A list of ChatMessage instances representing the input messages.
245
        :param streaming_callback:
246
            A callback function that is called when a new token is received from the stream.
247
        :param generation_kwargs:
248
            Additional keyword arguments for text generation. These parameters will
249
            override the parameters passed during component initialization.
250
            For details on OpenAI API parameters, see [OpenAI documentation](https://platform.openai.com/docs/api-reference/chat/create).
251
        :param tools:
252
            A list of tools or a Toolset for which the model can prepare calls. If set, it will override the
253
            `tools` parameter set during component initialization. This parameter can accept either a list of
254
            `Tool` objects or a `Toolset` instance.
255
        :param tools_strict:
256
            Whether to enable strict schema adherence for tool calls. If set to `True`, the model will follow exactly
257
            the schema provided in the `parameters` field of the tool definition, but this may increase latency.
258
            If set, it will override the `tools_strict` parameter set during component initialization.
259

260
        :returns:
261
            A dictionary with the following key:
262
            - `replies`: A list containing the generated responses as ChatMessage instances.
263
        """
264
        if len(messages) == 0:
1✔
265
            return {"replies": []}
×
266

267
        streaming_callback = select_streaming_callback(
1✔
268
            init_callback=self.streaming_callback, runtime_callback=streaming_callback, requires_async=False
269
        )
270

271
        api_args = self._prepare_api_call(
1✔
272
            messages=messages,
273
            streaming_callback=streaming_callback,
274
            generation_kwargs=generation_kwargs,
275
            tools=tools,
276
            tools_strict=tools_strict,
277
        )
278
        chat_completion: Union[Stream[ChatCompletionChunk], ChatCompletion] = self.client.chat.completions.create(
1✔
279
            **api_args
280
        )
281

282
        if streaming_callback is not None:
1✔
283
            completions = self._handle_stream_response(
1✔
284
                chat_completion,  # type: ignore
285
                streaming_callback,  # type: ignore
286
            )
287

288
        else:
289
            assert isinstance(chat_completion, ChatCompletion), "Unexpected response type for non-streaming request."
1✔
290
            completions = [
1✔
291
                self._convert_chat_completion_to_chat_message(chat_completion, choice)
292
                for choice in chat_completion.choices
293
            ]
294

295
        # before returning, do post-processing of the completions
296
        for message in completions:
1✔
297
            self._check_finish_reason(message.meta)
1✔
298

299
        return {"replies": completions}
1✔
300

301
    @component.output_types(replies=List[ChatMessage])
1✔
302
    async def run_async(
1✔
303
        self,
304
        messages: List[ChatMessage],
305
        streaming_callback: Optional[StreamingCallbackT] = None,
306
        generation_kwargs: Optional[Dict[str, Any]] = None,
307
        *,
308
        tools: Optional[Union[List[Tool], Toolset]] = None,
309
        tools_strict: Optional[bool] = None,
310
    ):
311
        """
312
        Asynchronously invokes chat completion based on the provided messages and generation parameters.
313

314
        This is the asynchronous version of the `run` method. It has the same parameters and return values
315
        but can be used with `await` in async code.
316

317
        :param messages:
318
            A list of ChatMessage instances representing the input messages.
319
        :param streaming_callback:
320
            A callback function that is called when a new token is received from the stream.
321
            Must be a coroutine.
322
        :param generation_kwargs:
323
            Additional keyword arguments for text generation. These parameters will
324
            override the parameters passed during component initialization.
325
            For details on OpenAI API parameters, see [OpenAI documentation](https://platform.openai.com/docs/api-reference/chat/create).
326
        :param tools:
327
            A list of tools or a Toolset for which the model can prepare calls. If set, it will override the
328
            `tools` parameter set during component initialization. This parameter can accept either a list of
329
            `Tool` objects or a `Toolset` instance.
330
        :param tools_strict:
331
            Whether to enable strict schema adherence for tool calls. If set to `True`, the model will follow exactly
332
            the schema provided in the `parameters` field of the tool definition, but this may increase latency.
333
            If set, it will override the `tools_strict` parameter set during component initialization.
334

335
        :returns:
336
            A dictionary with the following key:
337
            - `replies`: A list containing the generated responses as ChatMessage instances.
338
        """
339
        # validate and select the streaming callback
340
        streaming_callback = select_streaming_callback(
1✔
341
            init_callback=self.streaming_callback, runtime_callback=streaming_callback, requires_async=True
342
        )
343

344
        if len(messages) == 0:
1✔
345
            return {"replies": []}
×
346

347
        api_args = self._prepare_api_call(
1✔
348
            messages=messages,
349
            streaming_callback=streaming_callback,
350
            generation_kwargs=generation_kwargs,
351
            tools=tools,
352
            tools_strict=tools_strict,
353
        )
354

355
        chat_completion: Union[
1✔
356
            AsyncStream[ChatCompletionChunk], ChatCompletion
357
        ] = await self.async_client.chat.completions.create(**api_args)
358

359
        if streaming_callback is not None:
1✔
360
            completions = await self._handle_async_stream_response(
1✔
361
                chat_completion,  # type: ignore
362
                streaming_callback,  # type: ignore
363
            )
364

365
        else:
366
            assert isinstance(chat_completion, ChatCompletion), "Unexpected response type for non-streaming request."
1✔
367
            completions = [
1✔
368
                self._convert_chat_completion_to_chat_message(chat_completion, choice)
369
                for choice in chat_completion.choices
370
            ]
371

372
        # before returning, do post-processing of the completions
373
        for message in completions:
1✔
374
            self._check_finish_reason(message.meta)
1✔
375

376
        return {"replies": completions}
1✔
377

378
    def _prepare_api_call(  # noqa: PLR0913
1✔
379
        self,
380
        *,
381
        messages: List[ChatMessage],
382
        streaming_callback: Optional[StreamingCallbackT] = None,
383
        generation_kwargs: Optional[Dict[str, Any]] = None,
384
        tools: Optional[Union[List[Tool], Toolset]] = None,
385
        tools_strict: Optional[bool] = None,
386
    ) -> Dict[str, Any]:
387
        # update generation kwargs by merging with the generation kwargs passed to the run method
388
        generation_kwargs = {**self.generation_kwargs, **(generation_kwargs or {})}
1✔
389

390
        # adapt ChatMessage(s) to the format expected by the OpenAI API
391
        openai_formatted_messages = [message.to_openai_dict_format() for message in messages]
1✔
392

393
        tools = tools or self.tools
1✔
394
        if isinstance(tools, Toolset):
1✔
395
            tools = list(tools)
×
396
        tools_strict = tools_strict if tools_strict is not None else self.tools_strict
1✔
397
        _check_duplicate_tool_names(tools)
1✔
398

399
        openai_tools = {}
1✔
400
        if tools:
1✔
401
            tool_definitions = []
1✔
402
            for t in tools:
1✔
403
                function_spec = {**t.tool_spec}
1✔
404
                if tools_strict:
1✔
405
                    function_spec["strict"] = True
1✔
406
                    function_spec["parameters"]["additionalProperties"] = False
1✔
407
                tool_definitions.append({"type": "function", "function": function_spec})
1✔
408
            openai_tools = {"tools": tool_definitions}
1✔
409

410
        is_streaming = streaming_callback is not None
1✔
411
        num_responses = generation_kwargs.pop("n", 1)
1✔
412
        if is_streaming and num_responses > 1:
1✔
413
            raise ValueError("Cannot stream multiple responses, please set n=1.")
×
414

415
        return {
1✔
416
            "model": self.model,
417
            "messages": openai_formatted_messages,  # type: ignore[arg-type] # openai expects list of specific message types
418
            "stream": streaming_callback is not None,
419
            "n": num_responses,
420
            **openai_tools,
421
            **generation_kwargs,
422
        }
423

424
    def _handle_stream_response(self, chat_completion: Stream, callback: SyncStreamingCallbackT) -> List[ChatMessage]:
1✔
425
        chunks: List[StreamingChunk] = []
1✔
426
        chunk = None
1✔
427
        chunk_delta: StreamingChunk
428

429
        for chunk in chat_completion:  # pylint: disable=not-an-iterable
1✔
430
            assert len(chunk.choices) <= 1, "Streaming responses should have at most one choice."
1✔
431
            chunk_delta = self._convert_chat_completion_chunk_to_streaming_chunk(chunk)
1✔
432
            chunks.append(chunk_delta)
1✔
433
            callback(chunk_delta)
1✔
434
        return [self._convert_streaming_chunks_to_chat_message(chunk, chunks)]
1✔
435

436
    async def _handle_async_stream_response(
1✔
437
        self, chat_completion: AsyncStream, callback: AsyncStreamingCallbackT
438
    ) -> List[ChatMessage]:
439
        chunks: List[StreamingChunk] = []
1✔
440
        chunk = None
1✔
441
        chunk_delta: StreamingChunk
442

443
        async for chunk in chat_completion:  # pylint: disable=not-an-iterable
1✔
444
            assert len(chunk.choices) <= 1, "Streaming responses should have at most one choice."
1✔
445
            chunk_delta = self._convert_chat_completion_chunk_to_streaming_chunk(chunk)
1✔
446
            chunks.append(chunk_delta)
1✔
447
            await callback(chunk_delta)
1✔
448
        return [self._convert_streaming_chunks_to_chat_message(chunk, chunks)]
1✔
449

450
    def _check_finish_reason(self, meta: Dict[str, Any]) -> None:
1✔
451
        if meta["finish_reason"] == "length":
1✔
452
            logger.warning(
1✔
453
                "The completion for index {index} has been truncated before reaching a natural stopping point. "
454
                "Increase the max_tokens parameter to allow for longer completions.",
455
                index=meta["index"],
456
                finish_reason=meta["finish_reason"],
457
            )
458
        if meta["finish_reason"] == "content_filter":
1✔
459
            logger.warning(
1✔
460
                "The completion for index {index} has been truncated due to the content filter.",
461
                index=meta["index"],
462
                finish_reason=meta["finish_reason"],
463
            )
464

465
    def _convert_streaming_chunks_to_chat_message(
1✔
466
        self, last_chunk: ChatCompletionChunk, chunks: List[StreamingChunk]
467
    ) -> ChatMessage:
468
        """
469
        Connects the streaming chunks into a single ChatMessage.
470

471
        :param last_chunk: The last chunk returned by the OpenAI API.
472
        :param chunks: The list of all `StreamingChunk` objects.
473

474
        :returns: The ChatMessage.
475
        """
476
        text = "".join([chunk.content for chunk in chunks])
1✔
477
        tool_calls = []
1✔
478

479
        # Process tool calls if present in any chunk
480
        tool_call_data: Dict[str, Dict[str, str]] = {}  # Track tool calls by index
1✔
481
        for chunk_payload in chunks:
1✔
482
            tool_calls_meta = chunk_payload.meta.get("tool_calls")
1✔
483
            if tool_calls_meta is not None:
1✔
484
                for delta in tool_calls_meta:
1✔
485
                    # We use the index of the tool call to track it across chunks since the ID is not always provided
486
                    if delta.index not in tool_call_data:
1✔
487
                        tool_call_data[delta.index] = {"id": "", "name": "", "arguments": ""}
1✔
488

489
                    # Save the ID if present
490
                    if delta.id is not None:
1✔
491
                        tool_call_data[delta.index]["id"] = delta.id
1✔
492

493
                    if delta.function is not None:
1✔
494
                        if delta.function.name is not None:
1✔
495
                            tool_call_data[delta.index]["name"] += delta.function.name
1✔
496
                        if delta.function.arguments is not None:
1✔
497
                            tool_call_data[delta.index]["arguments"] += delta.function.arguments
1✔
498

499
        # Convert accumulated tool call data into ToolCall objects
500
        for call_data in tool_call_data.values():
1✔
501
            try:
1✔
502
                arguments = json.loads(call_data["arguments"])
1✔
503
                tool_calls.append(ToolCall(id=call_data["id"], tool_name=call_data["name"], arguments=arguments))
1✔
504
            except json.JSONDecodeError:
×
505
                logger.warning(
×
506
                    "OpenAI returned a malformed JSON string for tool call arguments. This tool call "
507
                    "will be skipped. To always generate a valid JSON, set `tools_strict` to `True`. "
508
                    "Tool call ID: {_id}, Tool name: {_name}, Arguments: {_arguments}",
509
                    _id=call_data["id"],
510
                    _name=call_data["name"],
511
                    _arguments=call_data["arguments"],
512
                )
513

514
        # finish_reason can appear in different places so we look for the last one
515
        finish_reasons = [
1✔
516
            chunk.meta.get("finish_reason") for chunk in chunks if chunk.meta.get("finish_reason") is not None
517
        ]
518
        finish_reason = finish_reasons[-1] if finish_reasons else None
1✔
519

520
        meta = {
1✔
521
            "model": last_chunk.model,
522
            "index": 0,
523
            "finish_reason": finish_reason,
524
            "completion_start_time": chunks[0].meta.get("received_at"),  # first chunk received
525
            "usage": self._serialize_usage(last_chunk.usage),  # last chunk has the final usage data if available
526
        }
527

528
        return ChatMessage.from_assistant(text=text or None, tool_calls=tool_calls, meta=meta)
1✔
529

530
    def _convert_chat_completion_to_chat_message(self, completion: ChatCompletion, choice: Choice) -> ChatMessage:
1✔
531
        """
532
        Converts the non-streaming response from the OpenAI API to a ChatMessage.
533

534
        :param completion: The completion returned by the OpenAI API.
535
        :param choice: The choice returned by the OpenAI API.
536
        :return: The ChatMessage.
537
        """
538
        message: ChatCompletionMessage = choice.message
1✔
539
        text = message.content
1✔
540
        tool_calls = []
1✔
541
        if openai_tool_calls := message.tool_calls:
1✔
542
            for openai_tc in openai_tool_calls:
1✔
543
                arguments_str = openai_tc.function.arguments
1✔
544
                try:
1✔
545
                    arguments = json.loads(arguments_str)
1✔
546
                    tool_calls.append(ToolCall(id=openai_tc.id, tool_name=openai_tc.function.name, arguments=arguments))
1✔
547
                except json.JSONDecodeError:
1✔
548
                    logger.warning(
1✔
549
                        "OpenAI returned a malformed JSON string for tool call arguments. This tool call "
550
                        "will be skipped. To always generate a valid JSON, set `tools_strict` to `True`. "
551
                        "Tool call ID: {_id}, Tool name: {_name}, Arguments: {_arguments}",
552
                        _id=openai_tc.id,
553
                        _name=openai_tc.function.name,
554
                        _arguments=arguments_str,
555
                    )
556

557
        chat_message = ChatMessage.from_assistant(text=text, tool_calls=tool_calls)
1✔
558
        chat_message._meta.update(
1✔
559
            {
560
                "model": completion.model,
561
                "index": choice.index,
562
                "finish_reason": choice.finish_reason,
563
                "usage": self._serialize_usage(completion.usage),
564
            }
565
        )
566
        return chat_message
1✔
567

568
    def _convert_chat_completion_chunk_to_streaming_chunk(self, chunk: ChatCompletionChunk) -> StreamingChunk:
1✔
569
        """
570
        Converts the streaming response chunk from the OpenAI API to a StreamingChunk.
571

572
        :param chunk: The chunk returned by the OpenAI API.
573

574
        :returns:
575
            The StreamingChunk.
576
        """
577

578
        # get the component name and type
579
        component_info = ComponentInfo()
1✔
580
        component_info.name = (
1✔
581
            str(self.__component_name__) if getattr(self, "__component_name__", None) is not None else None
582
        )
583
        component_info.type = f"{self.__class__.__module__}.{self.__class__.__name__}"
1✔
584

585
        # we stream the content of the chunk if it's not a tool or function call
586
        # if there are no choices, return an empty chunk
587
        if len(chunk.choices) == 0:
1✔
588
            return StreamingChunk(
1✔
589
                content="",
590
                meta={"model": chunk.model, "received_at": datetime.now().isoformat()},
591
                component_info=component_info,
592
            )
593

594
        choice: ChunkChoice = chunk.choices[0]
1✔
595
        content = choice.delta.content or ""
1✔
596
        chunk_message = StreamingChunk(content, component_info=component_info)
1✔
597
        print("chunk_message: ", chunk_message)
1✔
598

599
        # but save the tool calls and function call in the meta if they are present
600
        # and then connect the chunks in the _convert_streaming_chunks_to_chat_message method
601
        chunk_message.meta.update(
1✔
602
            {
603
                "model": chunk.model,
604
                "index": choice.index,
605
                "tool_calls": choice.delta.tool_calls,
606
                "finish_reason": choice.finish_reason,
607
                "received_at": datetime.now().isoformat(),
608
            }
609
        )
610
        print(f"chunk_message: {chunk_message}")
1✔
611
        return chunk_message
1✔
612

613
    def _serialize_usage(self, usage):
1✔
614
        """Convert OpenAI usage object to serializable dict recursively"""
615
        if hasattr(usage, "model_dump"):
1✔
616
            return usage.model_dump()
1✔
617
        elif hasattr(usage, "__dict__"):
1✔
618
            return {k: self._serialize_usage(v) for k, v in usage.__dict__.items() if not k.startswith("_")}
×
619
        elif isinstance(usage, dict):
1✔
620
            return {k: self._serialize_usage(v) for k, v in usage.items()}
×
621
        elif isinstance(usage, list):
1✔
622
            return [self._serialize_usage(item) for item in usage]
×
623
        else:
624
            return usage
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc