• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

scope3data / scope3ai-py / 14097016956

27 Mar 2025 01:41AM UTC coverage: 96.23% (+15.7%) from 80.557%
14097016956

Pull #92

github

5758a3
dearlordylord
feat(api): client-to-provider dry
Pull Request #92: feat: Managed Service Kebabs

53 of 55 new or added lines in 11 files covered. (96.36%)

44 existing lines in 10 files now uncovered.

2578 of 2679 relevant lines covered (96.23%)

3.85 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.06
/scope3ai/tracers/anthropic/chat.py
1
import time
4✔
2
from collections.abc import AsyncIterator, Awaitable, Iterator
4✔
3
from types import TracebackType
4✔
4
from typing import Any, Callable, Generic, Optional, TypeVar, Union
4✔
5

6
from anthropic import Anthropic, AsyncAnthropic
4✔
7
from anthropic import Stream as _Stream, AsyncStream as _AsyncStream
4✔
8
from anthropic._streaming import _T
4✔
9
from anthropic.lib.streaming import AsyncMessageStream as _AsyncMessageStream
4✔
10
from anthropic.lib.streaming import MessageStream as _MessageStream
4✔
11
from anthropic.types import Message as _Message
4✔
12
from anthropic.types.message_delta_event import MessageDeltaEvent
4✔
13
from anthropic.types.message_start_event import MessageStartEvent
4✔
14
from anthropic.types.message_stop_event import MessageStopEvent
4✔
15
from anthropic.types.raw_message_delta_event import RawMessageDeltaEvent
4✔
16
from anthropic.types.raw_message_start_event import RawMessageStartEvent
4✔
17
from anthropic.types.raw_message_stream_event import RawMessageStreamEvent
4✔
18
from typing_extensions import override
4✔
19

20
from scope3ai.api.types import Scope3AIContext, ImpactRow
4✔
21
from scope3ai.constants import try_provider_for_client, CLIENTS
4✔
22
from scope3ai.lib import Scope3AI
4✔
23

24

25
MessageStreamT = TypeVar("MessageStreamT", bound=_MessageStream)
4✔
26
AsyncMessageStreamT = TypeVar("AsyncMessageStreamT", bound=_AsyncMessageStream)
4✔
27

28

29
class Message(_Message):
4✔
30
    scope3ai: Optional[Scope3AIContext] = None
4✔
31

32

33
class MessageStream(_MessageStream):
4✔
34
    scope3ai: Optional[Scope3AIContext] = None
4✔
35

36
    @override
4✔
37
    def __stream_text__(self) -> Iterator[str]:  # type: ignore[misc]
4✔
38
        timer_start = time.perf_counter()
4✔
39
        output_tokens = 0
4✔
40
        input_tokens = 0
4✔
41
        model_name = None
4✔
42
        for chunk in self:
4✔
43
            if type(chunk) is MessageStartEvent:
4✔
44
                message = chunk.message
4✔
45
                model_name = message.model
4✔
46
                input_tokens += message.usage.input_tokens
4✔
47
                output_tokens += message.usage.output_tokens
4✔
48
            elif type(chunk) is MessageDeltaEvent:
4✔
49
                output_tokens += chunk.usage.output_tokens
4✔
50
            elif (
4✔
51
                chunk.type == "content_block_delta" and chunk.delta.type == "text_delta"
52
            ):
53
                yield chunk.delta.text
4✔
54
            elif type(chunk) is MessageStopEvent:
4✔
UNCOV
55
                input_tokens = message.usage.input_tokens
×
56
                output_tokens = message.usage.output_tokens
×
57

58
        requests_latency = time.perf_counter() - timer_start
4✔
59
        if model_name is not None:
4✔
60
            scope3_row = ImpactRow(
4✔
61
                model_id=model_name,
62
                input_tokens=input_tokens,
63
                output_tokens=output_tokens,
64
                request_duration_ms=requests_latency * 1000,
65
            )
66
            self.scope3ai = Scope3AI.get_instance().submit_impact(scope3_row)
4✔
67

68
    def __init__(self, parent) -> None:  # noqa: ANN001
4✔
69
        super().__init__(
4✔
70
            cast_to=parent._cast_to,  # noqa: SLF001
71
            response=parent.response,
72
            client=parent._client,  # noqa: SLF001
73
        )
74

75

76
class AsyncMessageStream(_AsyncMessageStream):
4✔
77
    scope3ai: Optional[Scope3AIContext] = None
4✔
78

79
    @override
4✔
80
    async def __stream_text__(self) -> AsyncIterator[str]:  # type: ignore[misc]
4✔
81
        timer_start = time.perf_counter()
4✔
82
        input_tokens = 0
4✔
83
        output_tokens = 0
4✔
84
        model_name = None
4✔
85
        async for chunk in self:
4✔
86
            if type(chunk) is MessageStartEvent:
4✔
87
                message = chunk.message
4✔
88
                model_name = message.model
4✔
89
                input_tokens += message.usage.input_tokens
4✔
90
                output_tokens += message.usage.output_tokens
4✔
91
            elif type(chunk) is MessageDeltaEvent:
4✔
92
                output_tokens += chunk.usage.output_tokens
4✔
93
            elif (
4✔
94
                chunk.type == "content_block_delta" and chunk.delta.type == "text_delta"
95
            ):
96
                yield chunk.delta.text
4✔
97
            elif type(chunk) is MessageStopEvent:
4✔
UNCOV
98
                input_tokens = message.usage.input_tokens
×
99
                output_tokens = message.usage.output_tokens
1✔
100
        requests_latency = time.perf_counter() - timer_start
4✔
101
        if model_name is not None:
4✔
102
            scope3_row = ImpactRow(
4✔
103
                managed_service_id=try_provider_for_client(CLIENTS.ANTHROPIC),
104
                model_id=model_name,
105
                input_tokens=input_tokens,
106
                output_tokens=output_tokens,
107
                request_duration_ms=requests_latency * 1000,
108
            )
109
            self.scope3ai = await Scope3AI.get_instance().asubmit_impact(scope3_row)
4✔
110

111
    def __init__(self, parent) -> None:  # noqa: ANN001
4✔
112
        super().__init__(
4✔
113
            cast_to=parent._cast_to,  # noqa: SLF001
114
            response=parent.response,
115
            client=parent._client,  # noqa: SLF001
116
        )
117

118

119
class MessageStreamManager(Generic[MessageStreamT]):
4✔
120
    def __init__(self, api_request: Callable[[], MessageStream]) -> None:
4✔
121
        self.__api_request = api_request
4✔
122

123
    def __enter__(self) -> MessageStream:
4✔
124
        self.__stream = self.__api_request()
4✔
125
        self.__stream = MessageStream(self.__stream)
4✔
126
        return self.__stream
4✔
127

128
    def __exit__(
4✔
129
        self,
130
        exc_type: Optional[type[BaseException]],
131
        exc: Optional[BaseException],
132
        exc_tb: Optional[TracebackType],
133
    ) -> None:
134
        if self.__stream is not None:
4✔
135
            self.__stream.close()
4✔
136

137

138
class AsyncMessageStreamManager(Generic[AsyncMessageStreamT]):
4✔
139
    def __init__(self, api_request: Awaitable[AsyncMessageStream]) -> None:
4✔
140
        self.__api_request = api_request
4✔
141

142
    async def __aenter__(self) -> AsyncMessageStream:
4✔
143
        self.__stream = await self.__api_request
4✔
144
        self.__stream = AsyncMessageStream(self.__stream)
4✔
145
        return self.__stream
4✔
146

147
    async def __aexit__(
4✔
148
        self,
149
        exc_type: Optional[type[BaseException]],
150
        exc: Optional[BaseException],
151
        exc_tb: Optional[TracebackType],
152
    ) -> None:
153
        if self.__stream is not None:
4✔
154
            await self.__stream.close()
4✔
155

156

157
class Stream(_Stream[_T]):
4✔
158
    scope3ai: Optional[Scope3AIContext] = None
4✔
159

160
    def __stream__(self) -> Iterator[_T]:
4✔
161
        timer_start = time.perf_counter()
4✔
162
        model = None
4✔
163
        input_tokens = output_tokens = request_latency = 0
4✔
164
        for event in super().__stream__():
4✔
165
            yield event
4✔
166
            if type(event) is RawMessageStartEvent:
4✔
167
                model = event.message.model
4✔
168
                input_tokens = event.message.usage.input_tokens
4✔
169
            elif type(event) is RawMessageDeltaEvent:
4✔
170
                output_tokens = event.usage.output_tokens
4✔
171
                request_latency = time.perf_counter() - timer_start
4✔
172

173
        scope3_row = ImpactRow(
4✔
174
            managed_service_id=try_provider_for_client(CLIENTS.ANTHROPIC),
175
            model_id=model,
176
            input_tokens=input_tokens,
177
            output_tokens=output_tokens,
178
            request_duration_ms=request_latency * 1000,
179
        )
180
        self.scope3ai = Scope3AI.get_instance().submit_impact(scope3_row)
4✔
181

182
    def __init__(self, parent) -> None:  # noqa: ANN001
4✔
183
        super().__init__(
4✔
184
            cast_to=parent._cast_to,  # noqa: SLF001
185
            response=parent.response,
186
            client=parent._client,  # noqa: SLF001
187
        )
188

189

190
class AsyncStream(_AsyncStream[_T]):
4✔
191
    scope3ai: Optional[Scope3AIContext] = None
4✔
192

193
    async def __stream__(self) -> AsyncIterator[_T]:
4✔
194
        timer_start = time.perf_counter()
4✔
195
        model = None
4✔
196
        input_tokens = output_tokens = request_latency = 0
4✔
197
        async for event in super().__stream__():
4✔
198
            yield event
4✔
199
            if type(event) is RawMessageStartEvent:
4✔
200
                model = event.message.model
4✔
201
                input_tokens = event.message.usage.input_tokens
4✔
202
            elif type(event) is RawMessageDeltaEvent:
4✔
203
                output_tokens = event.usage.output_tokens
4✔
204
                request_latency = time.perf_counter() - timer_start
4✔
205

206
        scope3_row = ImpactRow(
4✔
207
            managed_service_id=try_provider_for_client(CLIENTS.ANTHROPIC),
208
            model_id=model,
209
            input_tokens=input_tokens,
210
            output_tokens=output_tokens,
211
            request_duration_ms=request_latency * 1000,
212
        )
213
        self.scope3ai = await Scope3AI.get_instance().asubmit_impact(scope3_row)
4✔
214

215
    def __init__(self, parent) -> None:  # noqa: ANN001
4✔
216
        super().__init__(
4✔
217
            cast_to=parent._cast_to,  # noqa: SLF001
218
            response=parent.response,
219
            client=parent._client,  # noqa: SLF001
220
        )
221

222

223
def _anthropic_chat_wrapper(response: Message, request_latency: float) -> Message:
4✔
224
    model_name = response.model
4✔
225
    scope3_row = ImpactRow(
4✔
226
        managed_service_id=try_provider_for_client(CLIENTS.ANTHROPIC),
227
        model_id=model_name,
228
        input_tokens=response.usage.input_tokens,
229
        output_tokens=response.usage.output_tokens,
230
        request_duration_ms=request_latency * 1000,
231
    )
232
    scope3ai_ctx = Scope3AI.get_instance().submit_impact(scope3_row)
4✔
233
    if scope3ai_ctx is not None:
4✔
234
        return Message(**response.model_dump(), scope3ai=scope3ai_ctx)
4✔
235
    else:
UNCOV
236
        return response
×
237

238

239
def anthropic_chat_wrapper(
4✔
240
    wrapped: Callable,
241
    instance: Anthropic,
242
    args: Any,
243
    kwargs: Any,  # noqa: ARG001
244
) -> Union[Message, Stream[RawMessageStreamEvent]]:
245
    timer_start = time.perf_counter()
4✔
246
    response = wrapped(*args, **kwargs)
4✔
247
    request_latency = time.perf_counter() - timer_start
4✔
248

249
    is_stream = kwargs.get("stream", False)
4✔
250
    if is_stream:
4✔
251
        return Stream(response)
4✔
252
    return _anthropic_chat_wrapper(response, request_latency)
4✔
253

254

255
async def _anthropic_async_chat_wrapper(
4✔
256
    response: Message, request_latency: float
257
) -> Message:
258
    model_name = response.model
4✔
259
    scope3_row = ImpactRow(
4✔
260
        managed_service_id=try_provider_for_client(CLIENTS.ANTHROPIC),
261
        model_id=model_name,
262
        input_tokens=response.usage.input_tokens,
263
        output_tokens=response.usage.output_tokens,
264
        request_duration_ms=request_latency * 1000,
265
    )
266
    scope3ai_ctx = await Scope3AI.get_instance().asubmit_impact(scope3_row)
4✔
267
    if scope3ai_ctx is not None:
4✔
268
        return Message(**response.model_dump(), scope3ai=scope3ai_ctx)
4✔
269
    else:
UNCOV
270
        return response
×
271

272

273
async def anthropic_async_chat_wrapper(
4✔
274
    wrapped: Callable,
275
    instance: AsyncAnthropic,
276
    args: Any,
277
    kwargs: Any,  # noqa: ARG001
278
) -> Union[Message, AsyncStream[RawMessageStreamEvent]]:
279
    timer_start = time.perf_counter()
4✔
280
    response = await wrapped(*args, **kwargs)
4✔
281
    request_latency = time.perf_counter() - timer_start
4✔
282

283
    is_stream = kwargs.get("stream", False)
4✔
284
    if is_stream:
4✔
285
        return AsyncStream(response)
4✔
286
    return await _anthropic_async_chat_wrapper(response, request_latency)
4✔
287

288

289
def anthropic_stream_chat_wrapper(
4✔
290
    wrapped: Callable,
291
    instance: Anthropic,
292
    args: Any,
293
    kwargs: Any,  # noqa: ARG001
294
) -> MessageStreamManager:
295
    response = wrapped(*args, **kwargs)
4✔
296
    return MessageStreamManager(response._MessageStreamManager__api_request)  # noqa: SLF001
4✔
297

298

299
def anthropic_async_stream_chat_wrapper(
4✔
300
    wrapped: Callable,
301
    instance: AsyncAnthropic,
302
    args: Any,
303
    kwargs: Any,  # noqa: ARG001
304
) -> AsyncMessageStreamManager:
305
    response = wrapped(*args, **kwargs)
4✔
306
    return AsyncMessageStreamManager(response._AsyncMessageStreamManager__api_request)  # noqa: SLF001
4✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc