• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

scope3data / scope3ai-py / 13416015178

19 Feb 2025 03:18PM UTC coverage: 96.179% (+15.6%) from 80.557%
13416015178

Pull #91

github

404fae
web-flow
Merge b16436a44 into 37d564f57
Pull Request #91: docs: minor readme edits

2542 of 2643 relevant lines covered (96.18%)

3.84 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.04
/scope3ai/tracers/anthropic/chat.py
1
import time
4✔
2
from collections.abc import AsyncIterator, Awaitable, Iterator
4✔
3
from types import TracebackType
4✔
4
from typing import Any, Callable, Generic, Optional, TypeVar, Union
4✔
5

6
from anthropic import Anthropic, AsyncAnthropic
4✔
7
from anthropic import Stream as _Stream, AsyncStream as _AsyncStream
4✔
8
from anthropic._streaming import _T
4✔
9
from anthropic.lib.streaming import AsyncMessageStream as _AsyncMessageStream
4✔
10
from anthropic.lib.streaming import MessageStream as _MessageStream
4✔
11
from anthropic.types import Message as _Message
4✔
12
from anthropic.types.message_delta_event import MessageDeltaEvent
4✔
13
from anthropic.types.message_start_event import MessageStartEvent
4✔
14
from anthropic.types.message_stop_event import MessageStopEvent
4✔
15
from anthropic.types.raw_message_delta_event import RawMessageDeltaEvent
4✔
16
from anthropic.types.raw_message_start_event import RawMessageStartEvent
4✔
17
from anthropic.types.raw_message_stream_event import RawMessageStreamEvent
4✔
18
from typing_extensions import override
4✔
19

20
from scope3ai.api.types import Scope3AIContext, ImpactRow
4✔
21
from scope3ai.lib import Scope3AI
4✔
22

23

24
MessageStreamT = TypeVar("MessageStreamT", bound=_MessageStream)
4✔
25
AsyncMessageStreamT = TypeVar("AsyncMessageStreamT", bound=_AsyncMessageStream)
4✔
26

27

28
class Message(_Message):
4✔
29
    scope3ai: Optional[Scope3AIContext] = None
4✔
30

31

32
class MessageStream(_MessageStream):
4✔
33
    scope3ai: Optional[Scope3AIContext] = None
4✔
34

35
    @override
4✔
36
    def __stream_text__(self) -> Iterator[str]:  # type: ignore[misc]
4✔
37
        timer_start = time.perf_counter()
4✔
38
        output_tokens = 0
4✔
39
        input_tokens = 0
4✔
40
        model_name = None
4✔
41
        for chunk in self:
4✔
42
            if type(chunk) is MessageStartEvent:
4✔
43
                message = chunk.message
4✔
44
                model_name = message.model
4✔
45
                input_tokens += message.usage.input_tokens
4✔
46
                output_tokens += message.usage.output_tokens
4✔
47
            elif type(chunk) is MessageDeltaEvent:
4✔
48
                output_tokens += chunk.usage.output_tokens
4✔
49
            elif (
4✔
50
                chunk.type == "content_block_delta" and chunk.delta.type == "text_delta"
51
            ):
52
                yield chunk.delta.text
4✔
53
            elif type(chunk) is MessageStopEvent:
4✔
54
                input_tokens = message.usage.input_tokens
×
55
                output_tokens = message.usage.output_tokens
×
56

57
        requests_latency = time.perf_counter() - timer_start
4✔
58
        if model_name is not None:
4✔
59
            scope3_row = ImpactRow(
4✔
60
                model_id=model_name,
61
                input_tokens=input_tokens,
62
                output_tokens=output_tokens,
63
                request_duration_ms=requests_latency * 1000,
64
            )
65
            self.scope3ai = Scope3AI.get_instance().submit_impact(scope3_row)
4✔
66

67
    def __init__(self, parent) -> None:  # noqa: ANN001
4✔
68
        super().__init__(
4✔
69
            cast_to=parent._cast_to,  # noqa: SLF001
70
            response=parent.response,
71
            client=parent._client,  # noqa: SLF001
72
        )
73

74

75
class AsyncMessageStream(_AsyncMessageStream):
4✔
76
    scope3ai: Optional[Scope3AIContext] = None
4✔
77

78
    @override
4✔
79
    async def __stream_text__(self) -> AsyncIterator[str]:  # type: ignore[misc]
4✔
80
        timer_start = time.perf_counter()
4✔
81
        input_tokens = 0
4✔
82
        output_tokens = 0
4✔
83
        model_name = None
4✔
84
        async for chunk in self:
4✔
85
            if type(chunk) is MessageStartEvent:
4✔
86
                message = chunk.message
4✔
87
                model_name = message.model
4✔
88
                input_tokens += message.usage.input_tokens
4✔
89
                output_tokens += message.usage.output_tokens
4✔
90
            elif type(chunk) is MessageDeltaEvent:
4✔
91
                output_tokens += chunk.usage.output_tokens
4✔
92
            elif (
4✔
93
                chunk.type == "content_block_delta" and chunk.delta.type == "text_delta"
94
            ):
95
                yield chunk.delta.text
4✔
96
            elif type(chunk) is MessageStopEvent:
4✔
97
                input_tokens = message.usage.input_tokens
×
98
                output_tokens = message.usage.output_tokens
1✔
99
        requests_latency = time.perf_counter() - timer_start
4✔
100
        if model_name is not None:
4✔
101
            scope3_row = ImpactRow(
4✔
102
                model_id=model_name,
103
                input_tokens=input_tokens,
104
                output_tokens=output_tokens,
105
                request_duration_ms=requests_latency * 1000,
106
            )
107
            self.scope3ai = await Scope3AI.get_instance().asubmit_impact(scope3_row)
4✔
108

109
    def __init__(self, parent) -> None:  # noqa: ANN001
4✔
110
        super().__init__(
4✔
111
            cast_to=parent._cast_to,  # noqa: SLF001
112
            response=parent.response,
113
            client=parent._client,  # noqa: SLF001
114
        )
115

116

117
class MessageStreamManager(Generic[MessageStreamT]):
4✔
118
    def __init__(self, api_request: Callable[[], MessageStream]) -> None:
4✔
119
        self.__api_request = api_request
4✔
120

121
    def __enter__(self) -> MessageStream:
4✔
122
        self.__stream = self.__api_request()
4✔
123
        self.__stream = MessageStream(self.__stream)
4✔
124
        return self.__stream
4✔
125

126
    def __exit__(
4✔
127
        self,
128
        exc_type: Optional[type[BaseException]],
129
        exc: Optional[BaseException],
130
        exc_tb: Optional[TracebackType],
131
    ) -> None:
132
        if self.__stream is not None:
4✔
133
            self.__stream.close()
4✔
134

135

136
class AsyncMessageStreamManager(Generic[AsyncMessageStreamT]):
4✔
137
    def __init__(self, api_request: Awaitable[AsyncMessageStream]) -> None:
4✔
138
        self.__api_request = api_request
4✔
139

140
    async def __aenter__(self) -> AsyncMessageStream:
4✔
141
        self.__stream = await self.__api_request
4✔
142
        self.__stream = AsyncMessageStream(self.__stream)
4✔
143
        return self.__stream
4✔
144

145
    async def __aexit__(
4✔
146
        self,
147
        exc_type: Optional[type[BaseException]],
148
        exc: Optional[BaseException],
149
        exc_tb: Optional[TracebackType],
150
    ) -> None:
151
        if self.__stream is not None:
4✔
152
            await self.__stream.close()
4✔
153

154

155
class Stream(_Stream[_T]):
4✔
156
    scope3ai: Optional[Scope3AIContext] = None
4✔
157

158
    def __stream__(self) -> Iterator[_T]:
4✔
159
        timer_start = time.perf_counter()
4✔
160
        model = None
4✔
161
        input_tokens = output_tokens = request_latency = 0
4✔
162
        for event in super().__stream__():
4✔
163
            yield event
4✔
164
            if type(event) is RawMessageStartEvent:
4✔
165
                model = event.message.model
4✔
166
                input_tokens = event.message.usage.input_tokens
4✔
167
            elif type(event) is RawMessageDeltaEvent:
4✔
168
                output_tokens = event.usage.output_tokens
4✔
169
                request_latency = time.perf_counter() - timer_start
4✔
170

171
        scope3_row = ImpactRow(
4✔
172
            model_id=model,
173
            input_tokens=input_tokens,
174
            output_tokens=output_tokens,
175
            request_duration_ms=request_latency * 1000,
176
        )
177
        self.scope3ai = Scope3AI.get_instance().submit_impact(scope3_row)
4✔
178

179
    def __init__(self, parent) -> None:  # noqa: ANN001
4✔
180
        super().__init__(
4✔
181
            cast_to=parent._cast_to,  # noqa: SLF001
182
            response=parent.response,
183
            client=parent._client,  # noqa: SLF001
184
        )
185

186

187
class AsyncStream(_AsyncStream[_T]):
4✔
188
    scope3ai: Optional[Scope3AIContext] = None
4✔
189

190
    async def __stream__(self) -> AsyncIterator[_T]:
4✔
191
        timer_start = time.perf_counter()
4✔
192
        model = None
4✔
193
        input_tokens = output_tokens = request_latency = 0
4✔
194
        async for event in super().__stream__():
4✔
195
            yield event
4✔
196
            if type(event) is RawMessageStartEvent:
4✔
197
                model = event.message.model
4✔
198
                input_tokens = event.message.usage.input_tokens
4✔
199
            elif type(event) is RawMessageDeltaEvent:
4✔
200
                output_tokens = event.usage.output_tokens
4✔
201
                request_latency = time.perf_counter() - timer_start
4✔
202

203
        scope3_row = ImpactRow(
4✔
204
            model_id=model,
205
            input_tokens=input_tokens,
206
            output_tokens=output_tokens,
207
            request_duration_ms=request_latency * 1000,
208
        )
209
        self.scope3ai = await Scope3AI.get_instance().asubmit_impact(scope3_row)
4✔
210

211
    def __init__(self, parent) -> None:  # noqa: ANN001
4✔
212
        super().__init__(
4✔
213
            cast_to=parent._cast_to,  # noqa: SLF001
214
            response=parent.response,
215
            client=parent._client,  # noqa: SLF001
216
        )
217

218

219
def _anthropic_chat_wrapper(response: Message, request_latency: float) -> Message:
4✔
220
    model_name = response.model
4✔
221
    scope3_row = ImpactRow(
4✔
222
        model_id=model_name,
223
        input_tokens=response.usage.input_tokens,
224
        output_tokens=response.usage.output_tokens,
225
        request_duration_ms=request_latency * 1000,
226
    )
227
    scope3ai_ctx = Scope3AI.get_instance().submit_impact(scope3_row)
4✔
228
    if scope3ai_ctx is not None:
4✔
229
        return Message(**response.model_dump(), scope3ai=scope3ai_ctx)
4✔
230
    else:
231
        return response
×
232

233

234
def anthropic_chat_wrapper(
4✔
235
    wrapped: Callable,
236
    instance: Anthropic,
237
    args: Any,
238
    kwargs: Any,  # noqa: ARG001
239
) -> Union[Message, Stream[RawMessageStreamEvent]]:
240
    timer_start = time.perf_counter()
4✔
241
    response = wrapped(*args, **kwargs)
4✔
242
    request_latency = time.perf_counter() - timer_start
4✔
243

244
    is_stream = kwargs.get("stream", False)
4✔
245
    if is_stream:
4✔
246
        return Stream(response)
4✔
247
    return _anthropic_chat_wrapper(response, request_latency)
4✔
248

249

250
async def _anthropic_async_chat_wrapper(
4✔
251
    response: Message, request_latency: float
252
) -> Message:
253
    model_name = response.model
4✔
254
    scope3_row = ImpactRow(
4✔
255
        model_id=model_name,
256
        input_tokens=response.usage.input_tokens,
257
        output_tokens=response.usage.output_tokens,
258
        request_duration_ms=request_latency * 1000,
259
    )
260
    scope3ai_ctx = await Scope3AI.get_instance().asubmit_impact(scope3_row)
4✔
261
    if scope3ai_ctx is not None:
4✔
262
        return Message(**response.model_dump(), scope3ai=scope3ai_ctx)
4✔
263
    else:
264
        return response
×
265

266

267
async def anthropic_async_chat_wrapper(
4✔
268
    wrapped: Callable,
269
    instance: AsyncAnthropic,
270
    args: Any,
271
    kwargs: Any,  # noqa: ARG001
272
) -> Union[Message, AsyncStream[RawMessageStreamEvent]]:
273
    timer_start = time.perf_counter()
4✔
274
    response = await wrapped(*args, **kwargs)
4✔
275
    request_latency = time.perf_counter() - timer_start
4✔
276

277
    is_stream = kwargs.get("stream", False)
4✔
278
    if is_stream:
4✔
279
        return AsyncStream(response)
4✔
280
    return await _anthropic_async_chat_wrapper(response, request_latency)
4✔
281

282

283
def anthropic_stream_chat_wrapper(
4✔
284
    wrapped: Callable,
285
    instance: Anthropic,
286
    args: Any,
287
    kwargs: Any,  # noqa: ARG001
288
) -> MessageStreamManager:
289
    response = wrapped(*args, **kwargs)
4✔
290
    return MessageStreamManager(response._MessageStreamManager__api_request)  # noqa: SLF001
4✔
291

292

293
def anthropic_async_stream_chat_wrapper(
4✔
294
    wrapped: Callable,
295
    instance: AsyncAnthropic,
296
    args: Any,
297
    kwargs: Any,  # noqa: ARG001
298
) -> AsyncMessageStreamManager:
299
    response = wrapped(*args, **kwargs)
4✔
300
    return AsyncMessageStreamManager(response._AsyncMessageStreamManager__api_request)  # noqa: SLF001
4✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc