• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

scope3data / scope3ai-py / 12753874046

13 Jan 2025 06:40PM UTC coverage: 95.076% (+14.5%) from 80.557%
12753874046

Pull #61

github

3a8d3f
kevdevg
fix: vision pillow read bytes
Pull Request #61: feat(Hugging face): Vision methods - image classification / image segmentation / object detection

179 of 189 new or added lines in 5 files covered. (94.71%)

34 existing lines in 9 files now uncovered.

2008 of 2112 relevant lines covered (95.08%)

3.8 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.9
/scope3ai/tracers/openai/chat.py
1
import time
4✔
2
from typing import Any, Callable, Optional, Union
4✔
3

4
from scope3ai.constants import PROVIDERS
4✔
5
from scope3ai.lib import Scope3AI
4✔
6
from scope3ai.api.types import Scope3AIContext, Model, ImpactRow
4✔
7

8
try:
4✔
9
    from openai import AsyncStream, Stream
4✔
10
    from openai.resources.chat import AsyncCompletions, Completions
4✔
11
    from openai.types.chat import ChatCompletion as _ChatCompletion
4✔
12
    from openai.types.chat import ChatCompletionChunk as _ChatCompletionChunk
4✔
13
except ImportError:
×
14
    AsyncStream = object()
×
15
    Stream = object()
×
16
    AsyncCompletions = object()
×
17
    Completions = object()
×
18
    _ChatCompletion = object()
×
19
    _ChatCompletionChunk = object()
×
20

21

22
PROVIDER = PROVIDERS.OPENAI.value
4✔
23

24

25
class ChatCompletion(_ChatCompletion):
4✔
26
    scope3ai: Optional[Scope3AIContext] = None
4✔
27

28

29
class ChatCompletionChunk(_ChatCompletionChunk):
4✔
30
    scope3ai: Optional[Scope3AIContext] = None
4✔
31

32

33
def openai_chat_wrapper(
4✔
34
    wrapped: Callable, instance: Completions, args: Any, kwargs: Any
35
) -> Union[ChatCompletion, Stream[ChatCompletionChunk]]:
36
    if kwargs.get("stream", False):
4✔
37
        return openai_chat_wrapper_stream(wrapped, instance, args, kwargs)
4✔
38
    else:
39
        return openai_chat_wrapper_non_stream(wrapped, instance, args, kwargs)
4✔
40

41

42
def openai_chat_wrapper_non_stream(
4✔
43
    wrapped: Callable,
44
    instance: Completions,  # noqa: ARG001
45
    args: Any,
46
    kwargs: Any,
47
) -> ChatCompletion:
48
    timer_start = time.perf_counter()
4✔
49
    response = wrapped(*args, **kwargs)
4✔
50
    request_latency = time.perf_counter() - timer_start
4✔
51

52
    model_requested = kwargs["model"]
4✔
53
    model_used = response.model
4✔
54

55
    scope3_row = ImpactRow(
4✔
56
        model=Model(id=model_requested),
57
        model_used=Model(id=model_used),
58
        input_tokens=response.usage.prompt_tokens,
59
        output_tokens=response.usage.completion_tokens,
60
        request_duration_ms=request_latency * 1000,
61
        managed_service_id=PROVIDER,
62
    )
63

64
    scope3ai_ctx = Scope3AI.get_instance().submit_impact(scope3_row)
4✔
65
    return ChatCompletion(**response.model_dump(), scope3ai=scope3ai_ctx)
4✔
66

67

68
def openai_chat_wrapper_stream(
4✔
69
    wrapped: Callable,
70
    instance: Completions,  # noqa: ARG001
71
    args: Any,
72
    kwargs: Any,
73
) -> Stream[ChatCompletionChunk]:
74
    timer_start = time.perf_counter()
4✔
75
    if "stream_options" not in kwargs:
4✔
76
        kwargs["stream_options"] = {}
4✔
77
    if "include_usage" not in kwargs["stream_options"]:
4✔
78
        kwargs["stream_options"]["include_usage"] = True
4✔
79
    elif not kwargs["stream_options"]["include_usage"]:
×
80
        raise ValueError("stream_options include_usage must be True")
×
81

82
    stream = wrapped(*args, **kwargs)
4✔
83
    model_requested = kwargs["model"]
4✔
84

85
    for chunk in stream:
4✔
86
        request_latency = time.perf_counter() - timer_start
4✔
87

88
        if chunk.usage is not None:
4✔
89
            model_used = chunk.model
4✔
90

91
            scope3_row = ImpactRow(
4✔
92
                model=Model(id=model_requested),
93
                model_used=Model(id=model_used),
94
                input_tokens=chunk.usage.prompt_tokens,
95
                output_tokens=chunk.usage.completion_tokens,
96
                request_duration_ms=request_latency
97
                * 1000,  # TODO: can we get the header that has the processing time
98
                managed_service_id=PROVIDER,
99
            )
100

101
            scope3_ctx = Scope3AI.get_instance().submit_impact(scope3_row)
4✔
102
            yield ChatCompletionChunk(**chunk.model_dump(), scope3ai=scope3_ctx)
4✔
103
        else:
104
            yield chunk
4✔
105

106

107
async def openai_async_chat_wrapper(
4✔
108
    wrapped: Callable,
109
    instance: AsyncCompletions,
110
    args: Any,
111
    kwargs: Any,
112
) -> Union[ChatCompletion, AsyncStream[ChatCompletionChunk]]:
113
    if kwargs.get("stream", False):
4✔
114
        return openai_async_chat_wrapper_stream(wrapped, instance, args, kwargs)
4✔
115
    else:
116
        return await openai_async_chat_wrapper_base(wrapped, instance, args, kwargs)
4✔
117

118

119
async def openai_async_chat_wrapper_base(
4✔
120
    wrapped: Callable,
121
    instance: AsyncCompletions,  # noqa: ARG001
122
    args: Any,
123
    kwargs: Any,
124
) -> ChatCompletion:
125
    timer_start = time.perf_counter()
4✔
126
    response = await wrapped(*args, **kwargs)
4✔
127
    request_latency = time.perf_counter() - timer_start
4✔
128
    model_requested = kwargs["model"]
4✔
129
    model_used = response.model
4✔
130

131
    scope3_row = ImpactRow(
4✔
132
        model=Model(id=model_requested),
133
        model_used=Model(id=model_used),
134
        input_tokens=response.usage.prompt_tokens,
135
        output_tokens=response.usage.completion_tokens,
136
        request_duration_ms=request_latency
137
        * 1000,  # TODO: can we get the header that has the processing time
138
        managed_service_id=PROVIDER,
139
    )
140

141
    scope3_ctx = Scope3AI.get_instance().submit_impact(scope3_row)
4✔
142
    return ChatCompletion(**response.model_dump(), scope3ai=scope3_ctx)
4✔
143

144

145
async def openai_async_chat_wrapper_stream(
4✔
146
    wrapped: Callable,
147
    instance: AsyncCompletions,  # noqa: ARG001
148
    args: Any,
149
    kwargs: Any,
150
) -> AsyncStream[ChatCompletionChunk]:
151
    timer_start = time.perf_counter()
4✔
152
    if "stream_options" not in kwargs:
4✔
153
        kwargs["stream_options"] = {}
4✔
154
    if "include_usage" not in kwargs["stream_options"]:
4✔
155
        kwargs["stream_options"]["include_usage"] = True
4✔
UNCOV
156
    elif not kwargs["stream_options"]["include_usage"]:
×
157
        raise ValueError("stream_options include_usage must be True")
×
158

159
    stream = await wrapped(*args, **kwargs)
4✔
160
    model_requested = kwargs["model"]
4✔
161

162
    async for chunk in stream:
4✔
163
        request_latency = time.perf_counter() - timer_start
4✔
164

165
        if chunk.usage is not None:
4✔
166
            model_used = chunk.model
4✔
167

168
            scope3_row = ImpactRow(
4✔
169
                model=Model(id=model_requested),
170
                model_used=Model(id=model_used),
171
                input_tokens=chunk.usage.prompt_tokens,
172
                output_tokens=chunk.usage.completion_tokens,
173
                request_duration_ms=request_latency
174
                * 1000,  # TODO: can we get the header that has the processing time
175
                managed_service_id=PROVIDER,
176
            )
177

178
            scope3_ctx = Scope3AI.get_instance().submit_impact(scope3_row)
4✔
179
            yield ChatCompletionChunk(**chunk.model_dump(), scope3ai=scope3_ctx)
4✔
180
        else:
181
            yield chunk
4✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc