• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

scope3data / scope3ai-py / 13167091062

05 Feb 2025 09:40PM UTC coverage: 96.145% (+15.6%) from 80.557%
13167091062

Pull #87

github

264133
kevdevg
fix: fixing mistral stream
Pull Request #87: fix(tracer): tracer impact duplication fix

9 of 9 new or added lines in 1 file covered. (100.0%)

46 existing lines in 10 files now uncovered.

2569 of 2672 relevant lines covered (96.15%)

3.84 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.49
/scope3ai/tracers/openai/chat.py
1
import logging
4✔
2
import time
4✔
3
from typing import Any, Callable, Optional, Union
4✔
4

5
from openai import AsyncStream, Stream
4✔
6
from openai._legacy_response import LegacyAPIResponse as _LegacyAPIResponse
4✔
7
from openai.resources.chat import AsyncCompletions, Completions
4✔
8
from openai.types.chat import ChatCompletion as _ChatCompletion
4✔
9
from openai.types.chat import ChatCompletionChunk as _ChatCompletionChunk
4✔
10

11
from scope3ai.api.types import ImpactRow, Scope3AIContext
4✔
12
from scope3ai.constants import PROVIDERS
4✔
13
from scope3ai.lib import Scope3AI
4✔
14
from scope3ai.tracers.utils.multimodal import (
4✔
15
    aggregate_multimodal,
16
    aggregate_multimodal_audio_content_output,
17
)
18

19
PROVIDER = PROVIDERS.OPENAI.value
4✔
20

21
logger = logging.getLogger("scope3ai.tracers.openai.chat")
4✔
22

23

24
class LegacyApiResponse(_LegacyAPIResponse):
4✔
25
    scope3ai: Optional[Scope3AIContext] = None
4✔
26

27

28
class ChatCompletion(_ChatCompletion):
4✔
29
    scope3ai: Optional[Scope3AIContext] = None
4✔
30

31

32
class ChatCompletionChunk(_ChatCompletionChunk):
4✔
33
    scope3ai: Optional[Scope3AIContext] = None
4✔
34

35

36
def _openai_chat_wrapper(
4✔
37
    response: Any, request_latency: float, kwargs: dict
38
) -> Union[_LegacyAPIResponse, ChatCompletion, ImpactRow]:
39
    model_requested = kwargs.get("model")
4✔
40
    modalities = kwargs.get("modalities", [])
4✔
41
    if type(response) is _LegacyAPIResponse:
4✔
42
        http_response = response.http_response.json()
4✔
43
        model_used = http_response.get("model")
4✔
44
        scope3_row = ImpactRow(
4✔
45
            model_id=model_requested,
46
            model_used_id=model_used,
47
            input_tokens=http_response.get("usage", {}).get("prompt_tokens"),
48
            output_tokens=http_response.get("usage", {}).get("completion_tokens"),
49
            request_duration_ms=request_latency * 1000,
50
            managed_service_id=PROVIDER,
51
        )
52
        if "audio" in modalities:
4✔
53
            audio_format = kwargs.get("audio", {}).get("format", "mp3")
4✔
54
            for choice in http_response.get("choices", []):
4✔
55
                audio_data = choice.get("message", {}).get("audio", {})
4✔
56
                if audio_data:
4✔
57
                    audio_content = audio_data.get("data")
4✔
58
                    aggregate_multimodal_audio_content_output(
4✔
59
                        audio_content, audio_format, scope3_row
60
                    )
61

62
        messages = kwargs.get("messages", [])
4✔
63
        for message in messages:
4✔
64
            aggregate_multimodal(message, scope3_row, logger)
4✔
65
        return response, scope3_row
4✔
66
    else:
67
        scope3_row = ImpactRow(
4✔
68
            model_id=model_requested,
69
            model_used_id=response.model,
70
            input_tokens=response.usage.prompt_tokens,
71
            output_tokens=response.usage.completion_tokens,
72
            request_duration_ms=request_latency * 1000,
73
            managed_service_id=PROVIDER,
74
        )
75
        if "audio" in modalities:
4✔
76
            audio_format = kwargs.get("audio", {}).get("format", "mp3")
4✔
77
            for choice in response.choices:
4✔
78
                audio_data = getattr(choice.message, "audio")
4✔
79
                if audio_data:
4✔
80
                    audio_content = audio_data.data
4✔
81
                    aggregate_multimodal_audio_content_output(
4✔
82
                        audio_content, audio_format, scope3_row
83
                    )
84

85
        messages = kwargs.get("messages", [])
4✔
86
        for message in messages:
4✔
87
            aggregate_multimodal(message, scope3_row, logger)
4✔
88
        return response, scope3_row
4✔
89

90
    # analyse multimodal part
91

92

93
def openai_chat_wrapper_non_stream(
4✔
94
    wrapped: Callable,
95
    instance: Completions,  # noqa: ARG001
96
    args: Any,
97
    kwargs: Any,
98
) -> ChatCompletion:
99
    timer_start = time.perf_counter()
4✔
100
    response = wrapped(*args, **kwargs)
4✔
101
    request_latency = time.perf_counter() - timer_start
4✔
102
    response, scope3_row = _openai_chat_wrapper(response, request_latency, kwargs)
4✔
103
    scope3ai_ctx = Scope3AI.get_instance().submit_impact(scope3_row)
4✔
104
    if isinstance(response, _LegacyAPIResponse):
4✔
105
        setattr(response, "scope3ai", scope3ai_ctx)
4✔
106
        return response
4✔
107
    else:
108
        return ChatCompletion(**response.model_dump(), scope3ai=scope3ai_ctx)
4✔
109

110

111
def openai_chat_wrapper_stream(
4✔
112
    wrapped: Callable,
113
    instance: Completions,  # noqa: ARG001
114
    args: Any,
115
    kwargs: Any,
116
) -> Stream[ChatCompletionChunk]:
117
    timer_start = time.perf_counter()
4✔
118
    if "stream_options" not in kwargs:
4✔
119
        kwargs["stream_options"] = {}
4✔
120
    if "include_usage" not in kwargs["stream_options"]:
4✔
121
        kwargs["stream_options"]["include_usage"] = True
4✔
UNCOV
122
    elif not kwargs["stream_options"]["include_usage"]:
×
UNCOV
123
        raise ValueError("stream_options include_usage must be True")
×
124

125
    stream = wrapped(*args, **kwargs)
4✔
126
    model_requested = kwargs["model"]
4✔
127

128
    for chunk in stream:
4✔
129
        request_latency = time.perf_counter() - timer_start
4✔
130

131
        if chunk.usage is not None:
4✔
132
            model_used = chunk.model
4✔
133

134
            scope3_row = ImpactRow(
4✔
135
                model_id=model_requested,
136
                model_used_id=model_used,
137
                input_tokens=chunk.usage.prompt_tokens,
138
                output_tokens=chunk.usage.completion_tokens,
139
                request_duration_ms=request_latency
140
                * 1000,  # TODO: can we get the header that has the processing time
141
                managed_service_id=PROVIDER,
142
            )
143

144
            scope3_ctx = Scope3AI.get_instance().submit_impact(scope3_row)
4✔
145
            yield ChatCompletionChunk(**chunk.model_dump(), scope3ai=scope3_ctx)
4✔
146
        else:
147
            yield chunk
4✔
148

149

150
async def openai_async_chat_wrapper_non_stream(
4✔
151
    wrapped: Callable,
152
    instance: AsyncCompletions,  # noqa: ARG001
153
    args: Any,
154
    kwargs: Any,
155
) -> ChatCompletion:
156
    timer_start = time.perf_counter()
4✔
157
    response = await wrapped(*args, **kwargs)
4✔
158
    request_latency = time.perf_counter() - timer_start
4✔
159
    response, scope3_row = _openai_chat_wrapper(response, request_latency, kwargs)
4✔
160
    scope3ai_ctx = await Scope3AI.get_instance().asubmit_impact(scope3_row)
4✔
161
    if isinstance(response, _LegacyAPIResponse):
4✔
162
        setattr(response, "scope3ai", scope3ai_ctx)
4✔
163
        return response
4✔
164
    else:
165
        return ChatCompletion(**response.model_dump(), scope3ai=scope3ai_ctx)
4✔
166

167

168
async def openai_async_chat_wrapper_stream(
4✔
169
    wrapped: Callable,
170
    instance: AsyncCompletions,  # noqa: ARG001
171
    args: Any,
172
    kwargs: Any,
173
) -> AsyncStream[ChatCompletionChunk]:
174
    timer_start = time.perf_counter()
4✔
175
    if "stream_options" not in kwargs:
4✔
176
        kwargs["stream_options"] = {}
4✔
177
    if "include_usage" not in kwargs["stream_options"]:
4✔
178
        kwargs["stream_options"]["include_usage"] = True
4✔
UNCOV
179
    elif not kwargs["stream_options"]["include_usage"]:
×
180
        raise ValueError("stream_options include_usage must be True")
×
181

182
    stream = await wrapped(*args, **kwargs)
4✔
183
    model_requested = kwargs["model"]
4✔
184

185
    async for chunk in stream:
4✔
186
        request_latency = time.perf_counter() - timer_start
4✔
187

188
        if chunk.usage is not None:
4✔
189
            model_used = chunk.model
4✔
190

191
            scope3_row = ImpactRow(
4✔
192
                model_id=model_requested,
193
                model_used_id=model_used,
194
                input_tokens=chunk.usage.prompt_tokens,
195
                output_tokens=chunk.usage.completion_tokens,
196
                request_duration_ms=request_latency
197
                * 1000,  # TODO: can we get the header that has the processing time
198
                managed_service_id=PROVIDER,
199
            )
200

201
            scope3_ctx = await Scope3AI.get_instance().asubmit_impact(scope3_row)
4✔
202
            yield ChatCompletionChunk(**chunk.model_dump(), scope3ai=scope3_ctx)
4✔
203
        else:
204
            yield chunk
4✔
205

206

207
async def openai_async_chat_wrapper(
4✔
208
    wrapped: Callable,
209
    instance: AsyncCompletions,
210
    args: Any,
211
    kwargs: Any,
212
) -> Union[ChatCompletion, AsyncStream[ChatCompletionChunk]]:
213
    if kwargs.get("stream", False):
4✔
214
        return openai_async_chat_wrapper_stream(wrapped, instance, args, kwargs)
4✔
215
    else:
216
        return await openai_async_chat_wrapper_non_stream(
4✔
217
            wrapped, instance, args, kwargs
218
        )
219

220

221
def openai_chat_wrapper(
4✔
222
    wrapped: Callable, instance: Completions, args: Any, kwargs: Any
223
) -> Union[ChatCompletion, Stream[ChatCompletionChunk]]:
224
    if kwargs.get("stream", False):
4✔
225
        return openai_chat_wrapper_stream(wrapped, instance, args, kwargs)
4✔
226
    else:
227
        return openai_chat_wrapper_non_stream(wrapped, instance, args, kwargs)
4✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc