• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

scope3data / scope3ai-py / 14097016956

27 Mar 2025 01:41AM UTC coverage: 96.23% (+15.7%) from 80.557%
14097016956

Pull #92

github

5758a3
dearlordylord
feat(api): client-to-provider dry
Pull Request #92: feat: Managed Service Kebabs

53 of 55 new or added lines in 11 files covered. (96.36%)

44 existing lines in 10 files now uncovered.

2578 of 2679 relevant lines covered (96.23%)

3.85 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.46
/scope3ai/tracers/openai/chat.py
1
import logging
4✔
2
import time
4✔
3
from typing import Any, Callable, Optional, Union
4✔
4

5
from openai import AsyncStream, Stream
4✔
6
from openai._legacy_response import LegacyAPIResponse as _LegacyAPIResponse
4✔
7
from openai.resources.chat import AsyncCompletions, Completions
4✔
8
from openai.types.chat import ChatCompletion as _ChatCompletion
4✔
9
from openai.types.chat import ChatCompletionChunk as _ChatCompletionChunk
4✔
10

11
from scope3ai.api.types import ImpactRow, Scope3AIContext
4✔
12
from scope3ai.lib import Scope3AI
4✔
13
from scope3ai.constants import CLIENTS, try_provider_for_client
4✔
14
from scope3ai.tracers.utils.multimodal import (
4✔
15
    aggregate_multimodal,
16
    aggregate_multimodal_audio_content_output,
17
)
18

19

20
logger = logging.getLogger("scope3ai.tracers.openai.chat")
4✔
21

22

23
class LegacyApiResponse(_LegacyAPIResponse):
4✔
24
    scope3ai: Optional[Scope3AIContext] = None
4✔
25

26

27
class ChatCompletion(_ChatCompletion):
4✔
28
    scope3ai: Optional[Scope3AIContext] = None
4✔
29

30

31
class ChatCompletionChunk(_ChatCompletionChunk):
4✔
32
    scope3ai: Optional[Scope3AIContext] = None
4✔
33

34

35
def _openai_chat_wrapper(
4✔
36
    response: Any, request_latency: float, kwargs: dict
37
) -> Union[_LegacyAPIResponse, ChatCompletion, ImpactRow]:
38
    model_requested = kwargs.get("model")
4✔
39
    modalities = kwargs.get("modalities", [])
4✔
40
    if type(response) is _LegacyAPIResponse:
4✔
41
        http_response = response.http_response.json()
4✔
42
        model_used = http_response.get("model")
4✔
43
        scope3_row = ImpactRow(
4✔
44
            managed_service_id=try_provider_for_client(CLIENTS.OPENAI),
45
            model_id=model_requested,
46
            model_used_id=model_used,
47
            input_tokens=http_response.get("usage", {}).get("prompt_tokens"),
48
            output_tokens=http_response.get("usage", {}).get("completion_tokens"),
49
            request_duration_ms=request_latency * 1000,
50
        )
51
        if "audio" in modalities:
4✔
52
            audio_format = kwargs.get("audio", {}).get("format", "mp3")
4✔
53
            for choice in http_response.get("choices", []):
4✔
54
                audio_data = choice.get("message", {}).get("audio", {})
4✔
55
                if audio_data:
4✔
56
                    audio_content = audio_data.get("data")
4✔
57
                    aggregate_multimodal_audio_content_output(
4✔
58
                        audio_content, audio_format, scope3_row
59
                    )
60

61
        messages = kwargs.get("messages", [])
4✔
62
        for message in messages:
4✔
63
            aggregate_multimodal(message, scope3_row, logger)
4✔
64
        return response, scope3_row
4✔
65
    else:
66
        scope3_row = ImpactRow(
4✔
67
            managed_service_id=try_provider_for_client(CLIENTS.OPENAI),
68
            model_id=model_requested,
69
            model_used_id=response.model,
70
            input_tokens=response.usage.prompt_tokens,
71
            output_tokens=response.usage.completion_tokens,
72
            request_duration_ms=request_latency * 1000,
73
        )
74
        if "audio" in modalities:
4✔
75
            audio_format = kwargs.get("audio", {}).get("format", "mp3")
4✔
76
            for choice in response.choices:
4✔
77
                audio_data = getattr(choice.message, "audio")
4✔
78
                if audio_data:
4✔
79
                    audio_content = audio_data.data
4✔
80
                    aggregate_multimodal_audio_content_output(
4✔
81
                        audio_content, audio_format, scope3_row
82
                    )
83

84
        messages = kwargs.get("messages", [])
4✔
85
        for message in messages:
4✔
86
            aggregate_multimodal(message, scope3_row, logger)
4✔
87
        return response, scope3_row
4✔
88

89
    # analyse multimodal part
90

91

92
def openai_chat_wrapper_non_stream(
4✔
93
    wrapped: Callable,
94
    instance: Completions,  # noqa: ARG001
95
    args: Any,
96
    kwargs: Any,
97
) -> ChatCompletion:
98
    timer_start = time.perf_counter()
4✔
99
    response = wrapped(*args, **kwargs)
4✔
100
    request_latency = time.perf_counter() - timer_start
4✔
101
    response, scope3_row = _openai_chat_wrapper(response, request_latency, kwargs)
4✔
102
    scope3ai_ctx = Scope3AI.get_instance().submit_impact(scope3_row)
4✔
103
    if isinstance(response, _LegacyAPIResponse):
4✔
104
        setattr(response, "scope3ai", scope3ai_ctx)
4✔
105
        return response
4✔
106
    else:
107
        return ChatCompletion(**response.model_dump(), scope3ai=scope3ai_ctx)
4✔
108

109

110
def openai_chat_wrapper_stream(
4✔
111
    wrapped: Callable,
112
    instance: Completions,  # noqa: ARG001
113
    args: Any,
114
    kwargs: Any,
115
) -> Stream[ChatCompletionChunk]:
116
    timer_start = time.perf_counter()
4✔
117
    if "stream_options" not in kwargs:
4✔
118
        kwargs["stream_options"] = {}
4✔
119
    if "include_usage" not in kwargs["stream_options"]:
4✔
120
        kwargs["stream_options"]["include_usage"] = True
4✔
UNCOV
121
    elif not kwargs["stream_options"]["include_usage"]:
×
UNCOV
122
        raise ValueError("stream_options include_usage must be True")
×
123

124
    stream = wrapped(*args, **kwargs)
4✔
125
    model_requested = kwargs["model"]
4✔
126

127
    for chunk in stream:
4✔
128
        request_latency = time.perf_counter() - timer_start
4✔
129

130
        if chunk.usage is not None:
4✔
131
            model_used = chunk.model
4✔
132

133
            scope3_row = ImpactRow(
4✔
134
                managed_service_id=try_provider_for_client(CLIENTS.OPENAI),
135
                model_id=model_requested,
136
                model_used_id=model_used,
137
                input_tokens=chunk.usage.prompt_tokens,
138
                output_tokens=chunk.usage.completion_tokens,
139
                request_duration_ms=request_latency
140
                * 1000,  # TODO: can we get the header that has the processing time
141
            )
142

143
            scope3_ctx = Scope3AI.get_instance().submit_impact(scope3_row)
4✔
144
            yield ChatCompletionChunk(**chunk.model_dump(), scope3ai=scope3_ctx)
4✔
145
        else:
146
            yield chunk
4✔
147

148

149
async def openai_async_chat_wrapper_non_stream(
4✔
150
    wrapped: Callable,
151
    instance: AsyncCompletions,  # noqa: ARG001
152
    args: Any,
153
    kwargs: Any,
154
) -> ChatCompletion:
155
    timer_start = time.perf_counter()
4✔
156
    response = await wrapped(*args, **kwargs)
4✔
157
    request_latency = time.perf_counter() - timer_start
4✔
158
    response, scope3_row = _openai_chat_wrapper(response, request_latency, kwargs)
4✔
159
    scope3ai_ctx = await Scope3AI.get_instance().asubmit_impact(scope3_row)
4✔
160
    if isinstance(response, _LegacyAPIResponse):
4✔
161
        setattr(response, "scope3ai", scope3ai_ctx)
4✔
162
        return response
4✔
163
    else:
164
        return ChatCompletion(**response.model_dump(), scope3ai=scope3ai_ctx)
4✔
165

166

167
async def openai_async_chat_wrapper_stream(
4✔
168
    wrapped: Callable,
169
    instance: AsyncCompletions,  # noqa: ARG001
170
    args: Any,
171
    kwargs: Any,
172
) -> AsyncStream[ChatCompletionChunk]:
173
    timer_start = time.perf_counter()
4✔
174
    if "stream_options" not in kwargs:
4✔
175
        kwargs["stream_options"] = {}
4✔
176
    if "include_usage" not in kwargs["stream_options"]:
4✔
177
        kwargs["stream_options"]["include_usage"] = True
4✔
UNCOV
178
    elif not kwargs["stream_options"]["include_usage"]:
×
UNCOV
179
        raise ValueError("stream_options include_usage must be True")
×
180

181
    stream = await wrapped(*args, **kwargs)
4✔
182
    model_requested = kwargs["model"]
4✔
183

184
    async for chunk in stream:
4✔
185
        request_latency = time.perf_counter() - timer_start
4✔
186

187
        if chunk.usage is not None:
4✔
188
            model_used = chunk.model
4✔
189

190
            scope3_row = ImpactRow(
4✔
191
                managed_service_id=try_provider_for_client(CLIENTS.OPENAI),
192
                model_id=model_requested,
193
                model_used_id=model_used,
194
                input_tokens=chunk.usage.prompt_tokens,
195
                output_tokens=chunk.usage.completion_tokens,
196
                request_duration_ms=request_latency
197
                * 1000,  # TODO: can we get the header that has the processing time
198
            )
199

200
            scope3_ctx = await Scope3AI.get_instance().asubmit_impact(scope3_row)
4✔
201
            yield ChatCompletionChunk(**chunk.model_dump(), scope3ai=scope3_ctx)
4✔
202
        else:
203
            yield chunk
4✔
204

205

206
async def openai_async_chat_wrapper(
4✔
207
    wrapped: Callable,
208
    instance: AsyncCompletions,
209
    args: Any,
210
    kwargs: Any,
211
) -> Union[ChatCompletion, AsyncStream[ChatCompletionChunk]]:
212
    if kwargs.get("stream", False):
4✔
213
        return openai_async_chat_wrapper_stream(wrapped, instance, args, kwargs)
4✔
214
    else:
215
        return await openai_async_chat_wrapper_non_stream(
4✔
216
            wrapped, instance, args, kwargs
217
        )
218

219

220
def openai_chat_wrapper(
4✔
221
    wrapped: Callable, instance: Completions, args: Any, kwargs: Any
222
) -> Union[ChatCompletion, Stream[ChatCompletionChunk]]:
223
    if kwargs.get("stream", False):
4✔
224
        return openai_chat_wrapper_stream(wrapped, instance, args, kwargs)
4✔
225
    else:
226
        return openai_chat_wrapper_non_stream(wrapped, instance, args, kwargs)
4✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc