• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

scope3data / scope3ai-py / 13040707031

29 Jan 2025 09:23PM UTC coverage: 96.412% (+15.9%) from 80.557%
13040707031

Pull #84

github

24322d
kevdevg
feat: multimodal output for openain/litellm
Pull Request #84: feat: multimodal output audio for OpenAi and Litellm

33 of 34 new or added lines in 3 files covered. (97.06%)

54 existing lines in 10 files now uncovered.

2472 of 2564 relevant lines covered (96.41%)

3.85 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.3
/scope3ai/tracers/openai/chat.py
1
import logging
4✔
2
import time
4✔
3
from typing import Any, Callable, Optional, Union
4✔
4

5
from openai import AsyncStream, Stream
4✔
6
from openai._legacy_response import LegacyAPIResponse as _LegacyAPIResponse
4✔
7
from openai.resources.chat import AsyncCompletions, Completions
4✔
8
from openai.types.chat import ChatCompletion as _ChatCompletion
4✔
9
from openai.types.chat import ChatCompletionChunk as _ChatCompletionChunk
4✔
10

11
from scope3ai.api.types import ImpactRow, Scope3AIContext
4✔
12
from scope3ai.constants import PROVIDERS
4✔
13
from scope3ai.lib import Scope3AI
4✔
14
from scope3ai.tracers.utils.multimodal import (
4✔
15
    aggregate_multimodal,
16
    aggregate_multimodal_audio_content_output,
17
)
18

19
PROVIDER = PROVIDERS.OPENAI.value
4✔
20

21
logger = logging.getLogger("scope3ai.tracers.openai.chat")
4✔
22

23

24
class LegacyApiResponse(_LegacyAPIResponse):
4✔
25
    scope3ai: Optional[Scope3AIContext] = None
4✔
26

27

28
class ChatCompletion(_ChatCompletion):
4✔
29
    scope3ai: Optional[Scope3AIContext] = None
4✔
30

31

32
class ChatCompletionChunk(_ChatCompletionChunk):
4✔
33
    scope3ai: Optional[Scope3AIContext] = None
4✔
34

35

36
def _openai_chat_wrapper(
4✔
37
    response: Any, request_latency: float, kwargs: dict
38
) -> ChatCompletion:
39
    model_requested = kwargs.get("model")
4✔
40
    modalities = kwargs.get("modalities", [])
4✔
41
    if type(response) is _LegacyAPIResponse:
4✔
42
        http_response = response.http_response.json()
4✔
43
        model_used = http_response.get("model")
4✔
44
        scope3_row = ImpactRow(
4✔
45
            model_id=model_requested,
46
            model_used_id=model_used,
47
            input_tokens=http_response.get("usage", {}).get("prompt_tokens"),
48
            output_tokens=http_response.get("usage", {}).get("completion_tokens"),
49
            request_duration_ms=request_latency * 1000,
50
            managed_service_id=PROVIDER,
51
        )
52
        if "audio" in modalities:
4✔
53
            audio_format = kwargs.get("audio", {}).get("format", "mp3")
4✔
54
            for choice in http_response.get("choices", []):
4✔
55
                audio_data = choice.get("message", {}).get("audio", {})
4✔
56
                if audio_data:
4✔
57
                    audio_content = audio_data.get("data")
4✔
58
                    aggregate_multimodal_audio_content_output(
4✔
59
                        audio_content, audio_format, scope3_row
60
                    )
61

62
        messages = kwargs.get("messages", [])
4✔
63
        for message in messages:
4✔
64
            aggregate_multimodal(message, scope3_row, logger)
4✔
65
        Scope3AI.get_instance().submit_impact(scope3_row)
4✔
66
        scope3ai_ctx = Scope3AI.get_instance().submit_impact(scope3_row)
4✔
67
        setattr(response, "scope3ai", scope3ai_ctx)
4✔
68
        return response
4✔
69
    else:
70
        scope3_row = ImpactRow(
4✔
71
            model_id=model_requested,
72
            model_used_id=response.model,
73
            input_tokens=response.usage.prompt_tokens,
74
            output_tokens=response.usage.completion_tokens,
75
            request_duration_ms=request_latency * 1000,
76
            managed_service_id=PROVIDER,
77
        )
78
        if "audio" in modalities:
4✔
79
            audio_format = kwargs.get("audio", {}).get("format", "mp3")
4✔
80
            for choice in response.choices:
4✔
81
                audio_data = getattr(choice.message, "audio")
4✔
82
                if audio_data:
4✔
83
                    audio_content = audio_data.data
4✔
84
                    aggregate_multimodal_audio_content_output(
4✔
85
                        audio_content, audio_format, scope3_row
86
                    )
87

88
        messages = kwargs.get("messages", [])
4✔
89
        for message in messages:
4✔
90
            aggregate_multimodal(message, scope3_row, logger)
4✔
91
        scope3ai_ctx = Scope3AI.get_instance().submit_impact(scope3_row)
4✔
92
        return ChatCompletion(**response.model_dump(), scope3ai=scope3ai_ctx)
4✔
93

94
    # analyse multimodal part
95

96

97
def openai_chat_wrapper_non_stream(
4✔
98
    wrapped: Callable,
99
    instance: Completions,  # noqa: ARG001
100
    args: Any,
101
    kwargs: Any,
102
) -> ChatCompletion:
103
    timer_start = time.perf_counter()
4✔
104
    response = wrapped(*args, **kwargs)
4✔
105
    request_latency = time.perf_counter() - timer_start
4✔
106
    return _openai_chat_wrapper(response, request_latency, kwargs)
4✔
107

108

109
def openai_chat_wrapper_stream(
4✔
110
    wrapped: Callable,
111
    instance: Completions,  # noqa: ARG001
112
    args: Any,
113
    kwargs: Any,
114
) -> Stream[ChatCompletionChunk]:
115
    timer_start = time.perf_counter()
4✔
116
    if "stream_options" not in kwargs:
4✔
117
        kwargs["stream_options"] = {}
4✔
118
    if "include_usage" not in kwargs["stream_options"]:
4✔
119
        kwargs["stream_options"]["include_usage"] = True
4✔
UNCOV
120
    elif not kwargs["stream_options"]["include_usage"]:
×
UNCOV
121
        raise ValueError("stream_options include_usage must be True")
×
122

123
    stream = wrapped(*args, **kwargs)
4✔
124
    model_requested = kwargs["model"]
4✔
125

126
    for chunk in stream:
4✔
127
        request_latency = time.perf_counter() - timer_start
4✔
128

129
        if chunk.usage is not None:
4✔
130
            model_used = chunk.model
4✔
131

132
            scope3_row = ImpactRow(
4✔
133
                model_id=model_requested,
134
                model_used_id=model_used,
135
                input_tokens=chunk.usage.prompt_tokens,
136
                output_tokens=chunk.usage.completion_tokens,
137
                request_duration_ms=request_latency
138
                * 1000,  # TODO: can we get the header that has the processing time
139
                managed_service_id=PROVIDER,
140
            )
141

142
            scope3_ctx = Scope3AI.get_instance().submit_impact(scope3_row)
4✔
143
            yield ChatCompletionChunk(**chunk.model_dump(), scope3ai=scope3_ctx)
4✔
144
        else:
145
            yield chunk
4✔
146

147

148
async def openai_async_chat_wrapper_non_stream(
4✔
149
    wrapped: Callable,
150
    instance: AsyncCompletions,  # noqa: ARG001
151
    args: Any,
152
    kwargs: Any,
153
) -> ChatCompletion:
154
    timer_start = time.perf_counter()
4✔
155
    response = await wrapped(*args, **kwargs)
4✔
156
    request_latency = time.perf_counter() - timer_start
4✔
157
    return _openai_chat_wrapper(response, request_latency, kwargs)
4✔
158

159

160
async def openai_async_chat_wrapper_stream(
4✔
161
    wrapped: Callable,
162
    instance: AsyncCompletions,  # noqa: ARG001
163
    args: Any,
164
    kwargs: Any,
165
) -> AsyncStream[ChatCompletionChunk]:
166
    timer_start = time.perf_counter()
4✔
167
    if "stream_options" not in kwargs:
4✔
168
        kwargs["stream_options"] = {}
4✔
169
    if "include_usage" not in kwargs["stream_options"]:
4✔
170
        kwargs["stream_options"]["include_usage"] = True
4✔
UNCOV
171
    elif not kwargs["stream_options"]["include_usage"]:
×
172
        raise ValueError("stream_options include_usage must be True")
×
173

174
    stream = await wrapped(*args, **kwargs)
4✔
175
    model_requested = kwargs["model"]
4✔
176

177
    async for chunk in stream:
4✔
178
        request_latency = time.perf_counter() - timer_start
4✔
179

180
        if chunk.usage is not None:
4✔
181
            model_used = chunk.model
4✔
182

183
            scope3_row = ImpactRow(
4✔
184
                model_id=model_requested,
185
                model_used_id=model_used,
186
                input_tokens=chunk.usage.prompt_tokens,
187
                output_tokens=chunk.usage.completion_tokens,
188
                request_duration_ms=request_latency
189
                * 1000,  # TODO: can we get the header that has the processing time
190
                managed_service_id=PROVIDER,
191
            )
192

193
            scope3_ctx = await Scope3AI.get_instance().asubmit_impact(scope3_row)
4✔
194
            yield ChatCompletionChunk(**chunk.model_dump(), scope3ai=scope3_ctx)
4✔
195
        else:
196
            yield chunk
4✔
197

198

199
async def openai_async_chat_wrapper(
4✔
200
    wrapped: Callable,
201
    instance: AsyncCompletions,
202
    args: Any,
203
    kwargs: Any,
204
) -> Union[ChatCompletion, AsyncStream[ChatCompletionChunk]]:
205
    if kwargs.get("stream", False):
4✔
206
        return openai_async_chat_wrapper_stream(wrapped, instance, args, kwargs)
4✔
207
    else:
208
        return await openai_async_chat_wrapper_non_stream(
4✔
209
            wrapped, instance, args, kwargs
210
        )
211

212

213
def openai_chat_wrapper(
4✔
214
    wrapped: Callable, instance: Completions, args: Any, kwargs: Any
215
) -> Union[ChatCompletion, Stream[ChatCompletionChunk]]:
216
    if kwargs.get("stream", False):
4✔
217
        return openai_chat_wrapper_stream(wrapped, instance, args, kwargs)
4✔
218
    else:
219
        return openai_chat_wrapper_non_stream(wrapped, instance, args, kwargs)
4✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc