• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

scope3data / scope3ai-py / 13416015178

19 Feb 2025 03:18PM UTC coverage: 96.179% (+15.6%) from 80.557%
13416015178

Pull #91

github

404fae
web-flow
Merge b16436a44 into 37d564f57
Pull Request #91: docs: minor readme edits

2542 of 2643 relevant lines covered (96.18%)

3.84 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.43
/scope3ai/tracers/openai/chat.py
1
import logging
4✔
2
import time
4✔
3
from typing import Any, Callable, Optional, Union
4✔
4

5
from openai import AsyncStream, Stream
4✔
6
from openai._legacy_response import LegacyAPIResponse as _LegacyAPIResponse
4✔
7
from openai.resources.chat import AsyncCompletions, Completions
4✔
8
from openai.types.chat import ChatCompletion as _ChatCompletion
4✔
9
from openai.types.chat import ChatCompletionChunk as _ChatCompletionChunk
4✔
10

11
from scope3ai.api.types import ImpactRow, Scope3AIContext
4✔
12
from scope3ai.lib import Scope3AI
4✔
13
from scope3ai.tracers.utils.multimodal import (
4✔
14
    aggregate_multimodal,
15
    aggregate_multimodal_audio_content_output,
16
)
17

18

19
logger = logging.getLogger("scope3ai.tracers.openai.chat")
4✔
20

21

22
class LegacyApiResponse(_LegacyAPIResponse):
4✔
23
    scope3ai: Optional[Scope3AIContext] = None
4✔
24

25

26
class ChatCompletion(_ChatCompletion):
4✔
27
    scope3ai: Optional[Scope3AIContext] = None
4✔
28

29

30
class ChatCompletionChunk(_ChatCompletionChunk):
4✔
31
    scope3ai: Optional[Scope3AIContext] = None
4✔
32

33

34
def _openai_chat_wrapper(
4✔
35
    response: Any, request_latency: float, kwargs: dict
36
) -> Union[_LegacyAPIResponse, ChatCompletion, ImpactRow]:
37
    model_requested = kwargs.get("model")
4✔
38
    modalities = kwargs.get("modalities", [])
4✔
39
    if type(response) is _LegacyAPIResponse:
4✔
40
        http_response = response.http_response.json()
4✔
41
        model_used = http_response.get("model")
4✔
42
        scope3_row = ImpactRow(
4✔
43
            model_id=model_requested,
44
            model_used_id=model_used,
45
            input_tokens=http_response.get("usage", {}).get("prompt_tokens"),
46
            output_tokens=http_response.get("usage", {}).get("completion_tokens"),
47
            request_duration_ms=request_latency * 1000,
48
        )
49
        if "audio" in modalities:
4✔
50
            audio_format = kwargs.get("audio", {}).get("format", "mp3")
4✔
51
            for choice in http_response.get("choices", []):
4✔
52
                audio_data = choice.get("message", {}).get("audio", {})
4✔
53
                if audio_data:
4✔
54
                    audio_content = audio_data.get("data")
4✔
55
                    aggregate_multimodal_audio_content_output(
4✔
56
                        audio_content, audio_format, scope3_row
57
                    )
58

59
        messages = kwargs.get("messages", [])
4✔
60
        for message in messages:
4✔
61
            aggregate_multimodal(message, scope3_row, logger)
4✔
62
        return response, scope3_row
4✔
63
    else:
64
        scope3_row = ImpactRow(
4✔
65
            model_id=model_requested,
66
            model_used_id=response.model,
67
            input_tokens=response.usage.prompt_tokens,
68
            output_tokens=response.usage.completion_tokens,
69
            request_duration_ms=request_latency * 1000,
70
        )
71
        if "audio" in modalities:
4✔
72
            audio_format = kwargs.get("audio", {}).get("format", "mp3")
4✔
73
            for choice in response.choices:
4✔
74
                audio_data = getattr(choice.message, "audio")
4✔
75
                if audio_data:
4✔
76
                    audio_content = audio_data.data
4✔
77
                    aggregate_multimodal_audio_content_output(
4✔
78
                        audio_content, audio_format, scope3_row
79
                    )
80

81
        messages = kwargs.get("messages", [])
4✔
82
        for message in messages:
4✔
83
            aggregate_multimodal(message, scope3_row, logger)
4✔
84
        return response, scope3_row
4✔
85

86
    # analyse multimodal part
87

88

89
def openai_chat_wrapper_non_stream(
4✔
90
    wrapped: Callable,
91
    instance: Completions,  # noqa: ARG001
92
    args: Any,
93
    kwargs: Any,
94
) -> ChatCompletion:
95
    timer_start = time.perf_counter()
4✔
96
    response = wrapped(*args, **kwargs)
4✔
97
    request_latency = time.perf_counter() - timer_start
4✔
98
    response, scope3_row = _openai_chat_wrapper(response, request_latency, kwargs)
4✔
99
    scope3ai_ctx = Scope3AI.get_instance().submit_impact(scope3_row)
4✔
100
    if isinstance(response, _LegacyAPIResponse):
4✔
101
        setattr(response, "scope3ai", scope3ai_ctx)
4✔
102
        return response
4✔
103
    else:
104
        return ChatCompletion(**response.model_dump(), scope3ai=scope3ai_ctx)
4✔
105

106

107
def openai_chat_wrapper_stream(
4✔
108
    wrapped: Callable,
109
    instance: Completions,  # noqa: ARG001
110
    args: Any,
111
    kwargs: Any,
112
) -> Stream[ChatCompletionChunk]:
113
    timer_start = time.perf_counter()
4✔
114
    if "stream_options" not in kwargs:
4✔
115
        kwargs["stream_options"] = {}
4✔
116
    if "include_usage" not in kwargs["stream_options"]:
4✔
117
        kwargs["stream_options"]["include_usage"] = True
4✔
118
    elif not kwargs["stream_options"]["include_usage"]:
×
119
        raise ValueError("stream_options include_usage must be True")
×
120

121
    stream = wrapped(*args, **kwargs)
4✔
122
    model_requested = kwargs["model"]
4✔
123

124
    for chunk in stream:
4✔
125
        request_latency = time.perf_counter() - timer_start
4✔
126

127
        if chunk.usage is not None:
4✔
128
            model_used = chunk.model
4✔
129

130
            scope3_row = ImpactRow(
4✔
131
                model_id=model_requested,
132
                model_used_id=model_used,
133
                input_tokens=chunk.usage.prompt_tokens,
134
                output_tokens=chunk.usage.completion_tokens,
135
                request_duration_ms=request_latency
136
                * 1000,  # TODO: can we get the header that has the processing time
137
            )
138

139
            scope3_ctx = Scope3AI.get_instance().submit_impact(scope3_row)
4✔
140
            yield ChatCompletionChunk(**chunk.model_dump(), scope3ai=scope3_ctx)
4✔
141
        else:
142
            yield chunk
4✔
143

144

145
async def openai_async_chat_wrapper_non_stream(
4✔
146
    wrapped: Callable,
147
    instance: AsyncCompletions,  # noqa: ARG001
148
    args: Any,
149
    kwargs: Any,
150
) -> ChatCompletion:
151
    timer_start = time.perf_counter()
4✔
152
    response = await wrapped(*args, **kwargs)
4✔
153
    request_latency = time.perf_counter() - timer_start
4✔
154
    response, scope3_row = _openai_chat_wrapper(response, request_latency, kwargs)
4✔
155
    scope3ai_ctx = await Scope3AI.get_instance().asubmit_impact(scope3_row)
4✔
156
    if isinstance(response, _LegacyAPIResponse):
4✔
157
        setattr(response, "scope3ai", scope3ai_ctx)
4✔
158
        return response
4✔
159
    else:
160
        return ChatCompletion(**response.model_dump(), scope3ai=scope3ai_ctx)
4✔
161

162

163
async def openai_async_chat_wrapper_stream(
4✔
164
    wrapped: Callable,
165
    instance: AsyncCompletions,  # noqa: ARG001
166
    args: Any,
167
    kwargs: Any,
168
) -> AsyncStream[ChatCompletionChunk]:
169
    timer_start = time.perf_counter()
4✔
170
    if "stream_options" not in kwargs:
4✔
171
        kwargs["stream_options"] = {}
4✔
172
    if "include_usage" not in kwargs["stream_options"]:
4✔
173
        kwargs["stream_options"]["include_usage"] = True
4✔
174
    elif not kwargs["stream_options"]["include_usage"]:
×
175
        raise ValueError("stream_options include_usage must be True")
×
176

177
    stream = await wrapped(*args, **kwargs)
4✔
178
    model_requested = kwargs["model"]
4✔
179

180
    async for chunk in stream:
4✔
181
        request_latency = time.perf_counter() - timer_start
4✔
182

183
        if chunk.usage is not None:
4✔
184
            model_used = chunk.model
4✔
185

186
            scope3_row = ImpactRow(
4✔
187
                model_id=model_requested,
188
                model_used_id=model_used,
189
                input_tokens=chunk.usage.prompt_tokens,
190
                output_tokens=chunk.usage.completion_tokens,
191
                request_duration_ms=request_latency
192
                * 1000,  # TODO: can we get the header that has the processing time
193
            )
194

195
            scope3_ctx = await Scope3AI.get_instance().asubmit_impact(scope3_row)
4✔
196
            yield ChatCompletionChunk(**chunk.model_dump(), scope3ai=scope3_ctx)
4✔
197
        else:
198
            yield chunk
4✔
199

200

201
async def openai_async_chat_wrapper(
4✔
202
    wrapped: Callable,
203
    instance: AsyncCompletions,
204
    args: Any,
205
    kwargs: Any,
206
) -> Union[ChatCompletion, AsyncStream[ChatCompletionChunk]]:
207
    if kwargs.get("stream", False):
4✔
208
        return openai_async_chat_wrapper_stream(wrapped, instance, args, kwargs)
4✔
209
    else:
210
        return await openai_async_chat_wrapper_non_stream(
4✔
211
            wrapped, instance, args, kwargs
212
        )
213

214

215
def openai_chat_wrapper(
4✔
216
    wrapped: Callable, instance: Completions, args: Any, kwargs: Any
217
) -> Union[ChatCompletion, Stream[ChatCompletionChunk]]:
218
    if kwargs.get("stream", False):
4✔
219
        return openai_chat_wrapper_stream(wrapped, instance, args, kwargs)
4✔
220
    else:
221
        return openai_chat_wrapper_non_stream(wrapped, instance, args, kwargs)
4✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc