• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

scope3data / scope3ai-py / 12920205472

23 Jan 2025 01:12AM UTC coverage: 95.993% (+15.4%) from 80.557%
12920205472

Pull #74

github

9af6ef
tito
fix: try fixing test again
Pull Request #74: feat(metadata): include many metadata accessible at global or tracer level

49 of 50 new or added lines in 4 files covered. (98.0%)

53 existing lines in 10 files now uncovered.

2204 of 2296 relevant lines covered (95.99%)

3.84 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.74
/scope3ai/tracers/openai/chat.py
1
import logging
4✔
2
import time
4✔
3
from typing import Any, Callable, Optional, Union
4✔
4

5
from openai import AsyncStream, Stream
4✔
6
from openai._legacy_response import LegacyAPIResponse as _LegacyAPIResponse
4✔
7
from openai.resources.chat import AsyncCompletions, Completions
4✔
8
from openai.types.chat import ChatCompletion as _ChatCompletion
4✔
9
from openai.types.chat import ChatCompletionChunk as _ChatCompletionChunk
4✔
10

11
from scope3ai.api.types import ImpactRow, Scope3AIContext
4✔
12
from scope3ai.constants import PROVIDERS
4✔
13
from scope3ai.lib import Scope3AI
4✔
14
from scope3ai.tracers.utils.multimodal import aggregate_multimodal
4✔
15

16
PROVIDER = PROVIDERS.OPENAI.value
4✔
17

18
logger = logging.getLogger("scope3ai.tracers.openai.chat")
4✔
19

20

21
class LegacyApiResponse(_LegacyAPIResponse):
4✔
22
    scope3ai: Optional[Scope3AIContext] = None
4✔
23

24

25
class ChatCompletion(_ChatCompletion):
4✔
26
    scope3ai: Optional[Scope3AIContext] = None
4✔
27

28

29
class ChatCompletionChunk(_ChatCompletionChunk):
4✔
30
    scope3ai: Optional[Scope3AIContext] = None
4✔
31

32

33
def _openai_chat_wrapper(
4✔
34
    response: Any, request_latency: float, kwargs: dict
35
) -> ChatCompletion:
36
    model_requested = kwargs["model"]
4✔
37
    if type(response) is _LegacyAPIResponse:
4✔
38
        http_response = response.http_response.json()
4✔
39
        model_used = http_response.get("model")
4✔
40
        scope3_row = ImpactRow(
4✔
41
            model_id=model_requested,
42
            model_used_id=model_used,
43
            input_tokens=http_response.get("usage").get("prompt_tokens"),
44
            output_tokens=http_response.get("usage").get("completion_tokens"),
45
            request_duration_ms=request_latency * 1000,
46
            managed_service_id=PROVIDER,
47
        )
48
        messages = kwargs.get("messages", [])
4✔
49
        for message in messages:
4✔
50
            aggregate_multimodal(message, scope3_row, logger)
4✔
51
        Scope3AI.get_instance().submit_impact(scope3_row)
4✔
52
        scope3ai_ctx = Scope3AI.get_instance().submit_impact(scope3_row)
4✔
53
        setattr(response, "scope3ai", scope3ai_ctx)
4✔
54
        return response
4✔
55
    else:
56
        model_used = response.model
4✔
57
        scope3_row = ImpactRow(
4✔
58
            model_id=model_requested,
59
            model_used_id=model_used,
60
            input_tokens=response.usage.prompt_tokens,
61
            output_tokens=response.usage.completion_tokens,
62
            request_duration_ms=request_latency * 1000,
63
            managed_service_id=PROVIDER,
64
        )
65
        messages = kwargs.get("messages", [])
4✔
66
        for message in messages:
4✔
67
            aggregate_multimodal(message, scope3_row, logger)
4✔
68
        scope3ai_ctx = Scope3AI.get_instance().submit_impact(scope3_row)
4✔
69
        return ChatCompletion(**response.model_dump(), scope3ai=scope3ai_ctx)
4✔
70

71
    # analyse multimodal part
72

73

74
def openai_chat_wrapper_non_stream(
4✔
75
    wrapped: Callable,
76
    instance: Completions,  # noqa: ARG001
77
    args: Any,
78
    kwargs: Any,
79
) -> ChatCompletion:
80
    timer_start = time.perf_counter()
4✔
81
    response = wrapped(*args, **kwargs)
4✔
82
    request_latency = time.perf_counter() - timer_start
4✔
83
    return _openai_chat_wrapper(response, request_latency, kwargs)
4✔
84

85

86
def openai_chat_wrapper_stream(
4✔
87
    wrapped: Callable,
88
    instance: Completions,  # noqa: ARG001
89
    args: Any,
90
    kwargs: Any,
91
) -> Stream[ChatCompletionChunk]:
92
    timer_start = time.perf_counter()
4✔
93
    if "stream_options" not in kwargs:
4✔
94
        kwargs["stream_options"] = {}
4✔
95
    if "include_usage" not in kwargs["stream_options"]:
4✔
96
        kwargs["stream_options"]["include_usage"] = True
4✔
UNCOV
97
    elif not kwargs["stream_options"]["include_usage"]:
×
UNCOV
98
        raise ValueError("stream_options include_usage must be True")
×
99

100
    stream = wrapped(*args, **kwargs)
4✔
101
    model_requested = kwargs["model"]
4✔
102

103
    for chunk in stream:
4✔
104
        request_latency = time.perf_counter() - timer_start
4✔
105

106
        if chunk.usage is not None:
4✔
107
            model_used = chunk.model
4✔
108

109
            scope3_row = ImpactRow(
4✔
110
                model_id=model_requested,
111
                model_used_id=model_used,
112
                input_tokens=chunk.usage.prompt_tokens,
113
                output_tokens=chunk.usage.completion_tokens,
114
                request_duration_ms=request_latency
115
                * 1000,  # TODO: can we get the header that has the processing time
116
                managed_service_id=PROVIDER,
117
            )
118

119
            scope3_ctx = Scope3AI.get_instance().submit_impact(scope3_row)
4✔
120
            yield ChatCompletionChunk(**chunk.model_dump(), scope3ai=scope3_ctx)
4✔
121
        else:
122
            yield chunk
4✔
123

124

125
async def openai_async_chat_wrapper_non_stream(
4✔
126
    wrapped: Callable,
127
    instance: AsyncCompletions,  # noqa: ARG001
128
    args: Any,
129
    kwargs: Any,
130
) -> ChatCompletion:
131
    timer_start = time.perf_counter()
4✔
132
    response = await wrapped(*args, **kwargs)
4✔
133
    request_latency = time.perf_counter() - timer_start
4✔
134
    return _openai_chat_wrapper(response, request_latency, kwargs)
4✔
135

136

137
async def openai_async_chat_wrapper_stream(
4✔
138
    wrapped: Callable,
139
    instance: AsyncCompletions,  # noqa: ARG001
140
    args: Any,
141
    kwargs: Any,
142
) -> AsyncStream[ChatCompletionChunk]:
143
    timer_start = time.perf_counter()
4✔
144
    if "stream_options" not in kwargs:
4✔
145
        kwargs["stream_options"] = {}
4✔
146
    if "include_usage" not in kwargs["stream_options"]:
4✔
147
        kwargs["stream_options"]["include_usage"] = True
4✔
UNCOV
148
    elif not kwargs["stream_options"]["include_usage"]:
×
149
        raise ValueError("stream_options include_usage must be True")
×
150

151
    stream = await wrapped(*args, **kwargs)
4✔
152
    model_requested = kwargs["model"]
4✔
153

154
    async for chunk in stream:
4✔
155
        request_latency = time.perf_counter() - timer_start
4✔
156

157
        if chunk.usage is not None:
4✔
158
            model_used = chunk.model
4✔
159

160
            scope3_row = ImpactRow(
4✔
161
                model_id=model_requested,
162
                model_used_id=model_used,
163
                input_tokens=chunk.usage.prompt_tokens,
164
                output_tokens=chunk.usage.completion_tokens,
165
                request_duration_ms=request_latency
166
                * 1000,  # TODO: can we get the header that has the processing time
167
                managed_service_id=PROVIDER,
168
            )
169

170
            scope3_ctx = await Scope3AI.get_instance().asubmit_impact(scope3_row)
4✔
171
            yield ChatCompletionChunk(**chunk.model_dump(), scope3ai=scope3_ctx)
4✔
172
        else:
173
            yield chunk
4✔
174

175

176
async def openai_async_chat_wrapper(
4✔
177
    wrapped: Callable,
178
    instance: AsyncCompletions,
179
    args: Any,
180
    kwargs: Any,
181
) -> Union[ChatCompletion, AsyncStream[ChatCompletionChunk]]:
182
    if kwargs.get("stream", False):
4✔
183
        return openai_async_chat_wrapper_stream(wrapped, instance, args, kwargs)
4✔
184
    else:
185
        return await openai_async_chat_wrapper_non_stream(
4✔
186
            wrapped, instance, args, kwargs
187
        )
188

189

190
def openai_chat_wrapper(
4✔
191
    wrapped: Callable, instance: Completions, args: Any, kwargs: Any
192
) -> Union[ChatCompletion, Stream[ChatCompletionChunk]]:
193
    if kwargs.get("stream", False):
4✔
194
        return openai_chat_wrapper_stream(wrapped, instance, args, kwargs)
4✔
195
    else:
196
        return openai_chat_wrapper_non_stream(wrapped, instance, args, kwargs)
4✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc