• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

scope3data / scope3ai-py / 12840122239

18 Jan 2025 02:37AM UTC coverage: 95.824% (+15.3%) from 80.557%
12840122239

Pull #68

github

308311
tito
fix(huggingface): fixing aiohttp not working with VCR if passing filename
Pull Request #68: fix(huggingface): fixing aiohttp not working with VCR if passing filename

3 of 3 new or added lines in 2 files covered. (100.0%)

45 existing lines in 9 files now uncovered.

2111 of 2203 relevant lines covered (95.82%)

3.83 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.0
/scope3ai/tracers/openai/chat.py
1
import base64
4✔
2
import logging
4✔
3
import time
4✔
4
from io import BytesIO
4✔
5
from typing import Any, Callable, Optional, Union
4✔
6

7
from openai import AsyncStream, Stream
4✔
8
from openai.resources.chat import AsyncCompletions, Completions
4✔
9
from openai.types.chat import ChatCompletion as _ChatCompletion
4✔
10
from openai.types.chat import ChatCompletionChunk as _ChatCompletionChunk
4✔
11

12
from scope3ai.api.types import ImpactRow, Scope3AIContext
4✔
13
from scope3ai.api.typesgen import Image as RootImage
4✔
14
from scope3ai.constants import PROVIDERS
4✔
15
from scope3ai.lib import Scope3AI
4✔
16

17
from .utils import MUTAGEN_MAPPING, _get_audio_duration
4✔
18

19
PROVIDER = PROVIDERS.OPENAI.value
4✔
20
logger = logging.getLogger("scope3ai.tracers.openai.chat")
4✔
21

22

23
class ChatCompletion(_ChatCompletion):
4✔
24
    scope3ai: Optional[Scope3AIContext] = None
4✔
25

26

27
class ChatCompletionChunk(_ChatCompletionChunk):
4✔
28
    scope3ai: Optional[Scope3AIContext] = None
4✔
29

30

31
def _openai_aggregate_multimodal_image(content: dict, row: ImpactRow) -> None:
4✔
32
    from PIL import Image
4✔
33

34
    url = content["image_url"]["url"]
4✔
35
    if url.startswith("data:"):
4✔
36
        # extract content type, and data part
37
        # example: data:image/jpeg;base64,....
38
        content_type, data = url.split(",", 1)
4✔
39
        image_data = BytesIO(base64.b64decode(data))
4✔
40
        image = Image.open(image_data)
4✔
41
        width, height = image.size
4✔
42
        size = RootImage(root=f"{width}x{height}")
4✔
43

44
        if row.input_images is None:
4✔
45
            row.input_images = [size]
4✔
46
        else:
47
            row.input_images.append(size)
4✔
48

49
    else:
50
        # TODO: not supported yet.
51
        # Should we actually download the file here just to have the size ??
52
        pass
1✔
53

54

55
def _openai_aggregate_multimodal_audio(content: dict, row: ImpactRow) -> None:
4✔
56
    input_audio = content["input_audio"]
4✔
57
    format = input_audio["format"]
4✔
58
    b64data = input_audio["data"]
4✔
59
    assert format in MUTAGEN_MAPPING
4✔
60

61
    # decode the base64 data
62
    audio_data = base64.b64decode(b64data)
4✔
63
    # TODO: accept audio duration as float in AiApi
64
    duration = int(_get_audio_duration(format, audio_data))
4✔
65

66
    if row.input_audio_seconds is None:
4✔
67
        row.input_audio_seconds = duration
4✔
68
    else:
69
        row.input_audio_seconds += duration
4✔
70

71

72
def _openai_aggregate_multimodal_content(content: dict, row: ImpactRow) -> None:
4✔
73
    try:
4✔
74
        content_type = content.get("type")
4✔
75
        if content_type == "image_url":
4✔
76
            _openai_aggregate_multimodal_image(content, row)
4✔
77
        elif content_type == "input_audio":
4✔
78
            _openai_aggregate_multimodal_audio(content, row)
4✔
79
    except Exception as e:
×
80
        logger.error(f"Error processing multimodal content: {e}")
×
81

82

83
def _openai_aggregate_multimodal(message: dict, row: ImpactRow) -> None:
4✔
84
    # if the message content is not a tuple/list, it's just text.
85
    # so there is nothing multimodal in it, we can just forget about it.
86
    content = message.get("content", [])
4✔
87
    if isinstance(content, (tuple, list)):
4✔
88
        for item in content:
4✔
89
            _openai_aggregate_multimodal_content(item, row)
4✔
90

91

92
def _openai_chat_wrapper(
4✔
93
    response: Any, request_latency: float, kwargs: dict
94
) -> ChatCompletion:
95
    model_requested = kwargs["model"]
4✔
96
    model_used = response.model
4✔
97

98
    scope3_row = ImpactRow(
4✔
99
        model_id=model_requested,
100
        model_used_id=model_used,
101
        input_tokens=response.usage.prompt_tokens,
102
        output_tokens=response.usage.completion_tokens,
103
        request_duration_ms=request_latency * 1000,
104
        managed_service_id=PROVIDER,
105
    )
106

107
    # analyse multimodal part
108
    messages = kwargs.get("messages", [])
4✔
109
    for message in messages:
4✔
110
        _openai_aggregate_multimodal(message, scope3_row)
4✔
111
    scope3ai_ctx = Scope3AI.get_instance().submit_impact(scope3_row)
4✔
112
    return ChatCompletion(**response.model_dump(), scope3ai=scope3ai_ctx)
4✔
113

114

115
def openai_chat_wrapper_non_stream(
4✔
116
    wrapped: Callable,
117
    instance: Completions,  # noqa: ARG001
118
    args: Any,
119
    kwargs: Any,
120
) -> ChatCompletion:
121
    timer_start = time.perf_counter()
4✔
122
    response = wrapped(*args, **kwargs)
4✔
123
    request_latency = time.perf_counter() - timer_start
4✔
124
    return _openai_chat_wrapper(response, request_latency, kwargs)
4✔
125

126

127
def openai_chat_wrapper_stream(
4✔
128
    wrapped: Callable,
129
    instance: Completions,  # noqa: ARG001
130
    args: Any,
131
    kwargs: Any,
132
) -> Stream[ChatCompletionChunk]:
133
    timer_start = time.perf_counter()
4✔
134
    if "stream_options" not in kwargs:
4✔
135
        kwargs["stream_options"] = {}
4✔
136
    if "include_usage" not in kwargs["stream_options"]:
4✔
137
        kwargs["stream_options"]["include_usage"] = True
4✔
UNCOV
138
    elif not kwargs["stream_options"]["include_usage"]:
×
UNCOV
139
        raise ValueError("stream_options include_usage must be True")
×
140

141
    stream = wrapped(*args, **kwargs)
4✔
142
    model_requested = kwargs["model"]
4✔
143

144
    for chunk in stream:
4✔
145
        request_latency = time.perf_counter() - timer_start
4✔
146

147
        if chunk.usage is not None:
4✔
148
            model_used = chunk.model
4✔
149

150
            scope3_row = ImpactRow(
4✔
151
                model_id=model_requested,
152
                model_used_id=model_used,
153
                input_tokens=chunk.usage.prompt_tokens,
154
                output_tokens=chunk.usage.completion_tokens,
155
                request_duration_ms=request_latency
156
                * 1000,  # TODO: can we get the header that has the processing time
157
                managed_service_id=PROVIDER,
158
            )
159

160
            scope3_ctx = Scope3AI.get_instance().submit_impact(scope3_row)
4✔
161
            yield ChatCompletionChunk(**chunk.model_dump(), scope3ai=scope3_ctx)
4✔
162
        else:
163
            yield chunk
4✔
164

165

166
async def openai_async_chat_wrapper_non_stream(
4✔
167
    wrapped: Callable,
168
    instance: AsyncCompletions,  # noqa: ARG001
169
    args: Any,
170
    kwargs: Any,
171
) -> ChatCompletion:
172
    timer_start = time.perf_counter()
4✔
173
    response = await wrapped(*args, **kwargs)
4✔
174
    request_latency = time.perf_counter() - timer_start
4✔
175
    return _openai_chat_wrapper(response, request_latency, kwargs)
4✔
176

177

178
async def openai_async_chat_wrapper_stream(
4✔
179
    wrapped: Callable,
180
    instance: AsyncCompletions,  # noqa: ARG001
181
    args: Any,
182
    kwargs: Any,
183
) -> AsyncStream[ChatCompletionChunk]:
184
    timer_start = time.perf_counter()
4✔
185
    if "stream_options" not in kwargs:
4✔
186
        kwargs["stream_options"] = {}
4✔
187
    if "include_usage" not in kwargs["stream_options"]:
4✔
188
        kwargs["stream_options"]["include_usage"] = True
4✔
UNCOV
189
    elif not kwargs["stream_options"]["include_usage"]:
×
UNCOV
190
        raise ValueError("stream_options include_usage must be True")
×
191

192
    stream = await wrapped(*args, **kwargs)
4✔
193
    model_requested = kwargs["model"]
4✔
194

195
    async for chunk in stream:
4✔
196
        request_latency = time.perf_counter() - timer_start
4✔
197

198
        if chunk.usage is not None:
4✔
199
            model_used = chunk.model
4✔
200

201
            scope3_row = ImpactRow(
4✔
202
                model_id=model_requested,
203
                model_used_id=model_used,
204
                input_tokens=chunk.usage.prompt_tokens,
205
                output_tokens=chunk.usage.completion_tokens,
206
                request_duration_ms=request_latency
207
                * 1000,  # TODO: can we get the header that has the processing time
208
                managed_service_id=PROVIDER,
209
            )
210

211
            scope3_ctx = await Scope3AI.get_instance().asubmit_impact(scope3_row)
4✔
212
            yield ChatCompletionChunk(**chunk.model_dump(), scope3ai=scope3_ctx)
4✔
213
        else:
214
            yield chunk
4✔
215

216

217
async def openai_async_chat_wrapper(
4✔
218
    wrapped: Callable,
219
    instance: AsyncCompletions,
220
    args: Any,
221
    kwargs: Any,
222
) -> Union[ChatCompletion, AsyncStream[ChatCompletionChunk]]:
223
    if kwargs.get("stream", False):
4✔
224
        return openai_async_chat_wrapper_stream(wrapped, instance, args, kwargs)
4✔
225
    else:
226
        return await openai_async_chat_wrapper_non_stream(
4✔
227
            wrapped, instance, args, kwargs
228
        )
229

230

231
def openai_chat_wrapper(
4✔
232
    wrapped: Callable, instance: Completions, args: Any, kwargs: Any
233
) -> Union[ChatCompletion, Stream[ChatCompletionChunk]]:
234
    if kwargs.get("stream", False):
4✔
235
        return openai_chat_wrapper_stream(wrapped, instance, args, kwargs)
4✔
236
    else:
237
        return openai_chat_wrapper_non_stream(wrapped, instance, args, kwargs)
4✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc