• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

scope3data / scope3ai-py / 12562311504

31 Dec 2024 05:28PM UTC coverage: 82.293% (+1.7%) from 80.557%
12562311504

Pull #36

github

1ea4ba
tito
fix: try increasing the timeout
Pull Request #36: fix(worker): add more unit tests and fix some behavior of the worker

5 of 5 new or added lines in 1 file covered. (100.0%)

1 existing line in 1 file now uncovered.

1213 of 1474 relevant lines covered (82.29%)

1.65 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.47
/scope3ai/tracers/anthropic/chat.py
1
import time
2✔
2
from collections.abc import AsyncIterator, Awaitable, Iterator
2✔
3
from types import TracebackType
2✔
4
from typing import Any, Callable, Generic, Optional, TypeVar, Union
2✔
5
from typing_extensions import override
2✔
6

7
from anthropic import Anthropic, AsyncAnthropic
2✔
8
from anthropic import Stream as _Stream, AsyncStream as _AsyncStream
2✔
9
from anthropic.lib.streaming import AsyncMessageStream as _AsyncMessageStream
2✔
10
from anthropic.lib.streaming import MessageStream as _MessageStream
2✔
11
from anthropic.types import Message as _Message
2✔
12
from anthropic.types.message_delta_event import MessageDeltaEvent
2✔
13
from anthropic.types.message_start_event import MessageStartEvent
2✔
14
from anthropic.types.message_stop_event import MessageStopEvent
2✔
15
from anthropic.types.raw_message_stream_event import RawMessageStreamEvent
2✔
16
from anthropic.types.raw_message_start_event import RawMessageStartEvent
2✔
17
from anthropic.types.raw_message_delta_event import RawMessageDeltaEvent
2✔
18
from anthropic._streaming import _T
2✔
19

20
from scope3ai.lib import Scope3AI
2✔
21
from scope3ai.api.types import Scope3AIContext, Model, ImpactRow
2✔
22

23
PROVIDER = "anthropic"
2✔
24

25
MessageStreamT = TypeVar("MessageStreamT", bound=_MessageStream)
2✔
26
AsyncMessageStreamT = TypeVar("AsyncMessageStreamT", bound=_AsyncMessageStream)
2✔
27

28

29
class Message(_Message):
2✔
30
    scope3ai: Optional[Scope3AIContext] = None
2✔
31

32

33
class MessageStream(_MessageStream):
2✔
34
    scope3ai: Optional[Scope3AIContext] = None
2✔
35

36
    @override
2✔
37
    def __stream_text__(self) -> Iterator[str]:  # type: ignore[misc]
2✔
38
        timer_start = time.perf_counter()
2✔
39
        output_tokens = 0
2✔
40
        input_tokens = 0
2✔
41
        model_name = None
2✔
42
        for chunk in self:
2✔
43
            if type(chunk) is MessageStartEvent:
2✔
44
                message = chunk.message
2✔
45
                model_name = message.model
2✔
46
                input_tokens += message.usage.input_tokens
2✔
47
                output_tokens += message.usage.output_tokens
2✔
48
            elif type(chunk) is MessageDeltaEvent:
2✔
49
                output_tokens += chunk.usage.output_tokens
2✔
50
            elif (
2✔
51
                chunk.type == "content_block_delta" and chunk.delta.type == "text_delta"
52
            ):
53
                yield chunk.delta.text
2✔
54
            elif type(chunk) is MessageStopEvent:
2✔
55
                input_tokens = message.usage.input_tokens
×
56
                output_tokens = message.usage.output_tokens
×
57

58
        requests_latency = time.perf_counter() - timer_start
2✔
59
        if model_name is not None:
2✔
60
            scope3_row = ImpactRow(
2✔
61
                model=Model(id=model_name),
62
                input_tokens=input_tokens,
63
                output_tokens=output_tokens,
64
                request_duration_ms=requests_latency * 1000,
65
                managed_service_id=PROVIDER,
66
            )
67
            self.scope3ai = Scope3AI.get_instance().submit_impact(scope3_row)
2✔
68

69
    def __init__(self, parent) -> None:  # noqa: ANN001
2✔
70
        super().__init__(
2✔
71
            cast_to=parent._cast_to,  # noqa: SLF001
72
            response=parent.response,
73
            client=parent._client,  # noqa: SLF001
74
        )
75

76

77
class AsyncMessageStream(_AsyncMessageStream):
2✔
78
    scope3ai: Optional[Scope3AIContext] = None
2✔
79

80
    @override
2✔
81
    async def __stream_text__(self) -> AsyncIterator[str]:  # type: ignore[misc]
2✔
82
        timer_start = time.perf_counter()
2✔
83
        input_tokens = 0
2✔
84
        output_tokens = 0
2✔
85
        model_name = None
2✔
86
        async for chunk in self:
2✔
87
            if type(chunk) is MessageStartEvent:
2✔
88
                message = chunk.message
2✔
89
                model_name = message.model
2✔
90
                input_tokens += message.usage.input_tokens
2✔
91
                output_tokens += message.usage.output_tokens
2✔
92
            elif type(chunk) is MessageDeltaEvent:
2✔
93
                output_tokens += chunk.usage.output_tokens
2✔
94
            elif (
2✔
95
                chunk.type == "content_block_delta" and chunk.delta.type == "text_delta"
96
            ):
97
                yield chunk.delta.text
2✔
98
            elif type(chunk) is MessageStopEvent:
2✔
99
                input_tokens = message.usage.input_tokens
×
UNCOV
100
                output_tokens = message.usage.output_tokens
×
101
        requests_latency = time.perf_counter() - timer_start
2✔
102
        if model_name is not None:
2✔
103
            scope3_row = ImpactRow(
2✔
104
                model=Model(id=model_name),
105
                input_tokens=input_tokens,
106
                output_tokens=output_tokens,
107
                request_duration_ms=requests_latency * 1000,
108
                managed_service_id=PROVIDER,
109
            )
110
            self.scope3ai = Scope3AI.get_instance().submit_impact(scope3_row)
2✔
111

112
    def __init__(self, parent) -> None:  # noqa: ANN001
2✔
113
        super().__init__(
2✔
114
            cast_to=parent._cast_to,  # noqa: SLF001
115
            response=parent.response,
116
            client=parent._client,  # noqa: SLF001
117
        )
118

119

120
class MessageStreamManager(Generic[MessageStreamT]):
2✔
121
    def __init__(self, api_request: Callable[[], MessageStream]) -> None:
2✔
122
        self.__api_request = api_request
2✔
123

124
    def __enter__(self) -> MessageStream:
2✔
125
        self.__stream = self.__api_request()
2✔
126
        self.__stream = MessageStream(self.__stream)
2✔
127
        return self.__stream
2✔
128

129
    def __exit__(
2✔
130
        self,
131
        exc_type: Optional[type[BaseException]],
132
        exc: Optional[BaseException],
133
        exc_tb: Optional[TracebackType],
134
    ) -> None:
135
        if self.__stream is not None:
2✔
136
            self.__stream.close()
2✔
137

138

139
class AsyncMessageStreamManager(Generic[AsyncMessageStreamT]):
2✔
140
    def __init__(self, api_request: Awaitable[AsyncMessageStream]) -> None:
2✔
141
        self.__api_request = api_request
2✔
142

143
    async def __aenter__(self) -> AsyncMessageStream:
2✔
144
        self.__stream = await self.__api_request
2✔
145
        self.__stream = AsyncMessageStream(self.__stream)
2✔
146
        return self.__stream
2✔
147

148
    async def __aexit__(
2✔
149
        self,
150
        exc_type: Optional[type[BaseException]],
151
        exc: Optional[BaseException],
152
        exc_tb: Optional[TracebackType],
153
    ) -> None:
154
        if self.__stream is not None:
2✔
155
            await self.__stream.close()
2✔
156

157

158
class Stream(_Stream[_T]):
2✔
159
    scope3ai: Optional[Scope3AIContext] = None
2✔
160

161
    def __stream__(self) -> Iterator[_T]:
2✔
162
        timer_start = time.perf_counter()
2✔
163
        model = None
2✔
164
        input_tokens = output_tokens = request_latency = 0
2✔
165
        for event in super().__stream__():
2✔
166
            yield event
2✔
167
            if type(event) is RawMessageStartEvent:
2✔
168
                model = event.message.model
2✔
169
                input_tokens = event.message.usage.input_tokens
2✔
170
            elif type(event) is RawMessageDeltaEvent:
2✔
171
                output_tokens = event.usage.output_tokens
2✔
172
                request_latency = time.perf_counter() - timer_start
2✔
173

174
        scope3_row = ImpactRow(
2✔
175
            model=Model(id=model),
176
            input_tokens=input_tokens,
177
            output_tokens=output_tokens,
178
            request_duration_ms=request_latency * 1000,
179
            managed_service_id=PROVIDER,
180
        )
181
        self.scope3ai = Scope3AI.get_instance().submit_impact(scope3_row)
2✔
182

183
    def __init__(self, parent) -> None:  # noqa: ANN001
2✔
184
        super().__init__(
2✔
185
            cast_to=parent._cast_to,  # noqa: SLF001
186
            response=parent.response,
187
            client=parent._client,  # noqa: SLF001
188
        )
189

190

191
class AsyncStream(_AsyncStream[_T]):
2✔
192
    scope3ai: Optional[Scope3AIContext] = None
2✔
193

194
    async def __stream__(self) -> AsyncIterator[_T]:
2✔
195
        timer_start = time.perf_counter()
2✔
196
        model = None
2✔
197
        input_tokens = output_tokens = request_latency = 0
2✔
198
        async for event in super().__stream__():
2✔
199
            yield event
2✔
200
            if type(event) is RawMessageStartEvent:
2✔
201
                model = event.message.model
2✔
202
                input_tokens = event.message.usage.input_tokens
2✔
203
            elif type(event) is RawMessageDeltaEvent:
2✔
204
                output_tokens = event.usage.output_tokens
2✔
205
                request_latency = time.perf_counter() - timer_start
2✔
206

207
        scope3_row = ImpactRow(
2✔
208
            model=Model(id=model),
209
            input_tokens=input_tokens,
210
            output_tokens=output_tokens,
211
            request_duration_ms=request_latency * 1000,
212
            managed_service_id=PROVIDER,
213
        )
214
        self.scope3ai = Scope3AI.get_instance().submit_impact(scope3_row)
2✔
215

216
    def __init__(self, parent) -> None:  # noqa: ANN001
2✔
217
        super().__init__(
2✔
218
            cast_to=parent._cast_to,  # noqa: SLF001
219
            response=parent.response,
220
            client=parent._client,  # noqa: SLF001
221
        )
222

223

224
def _anthropic_chat_wrapper(response: Message, request_latency: float) -> Message:
2✔
225
    model_name = response.model
2✔
226
    scope3_row = ImpactRow(
2✔
227
        model=Model(id=model_name),
228
        input_tokens=response.usage.input_tokens,
229
        output_tokens=response.usage.output_tokens,
230
        request_duration_ms=request_latency * 1000,
231
        managed_service_id=PROVIDER,
232
    )
233
    scope3ai_ctx = Scope3AI.get_instance().submit_impact(scope3_row)
2✔
234
    if scope3ai_ctx is not None:
2✔
235
        return Message(**response.model_dump(), scope3ai=scope3ai_ctx)
2✔
236
    else:
237
        return response
×
238

239

240
def anthropic_chat_wrapper(
2✔
241
    wrapped: Callable,
242
    instance: Anthropic,
243
    args: Any,
244
    kwargs: Any,  # noqa: ARG001
245
) -> Union[Message, Stream[RawMessageStreamEvent]]:
246
    timer_start = time.perf_counter()
2✔
247
    response = wrapped(*args, **kwargs)
2✔
248
    request_latency = time.perf_counter() - timer_start
2✔
249

250
    is_stream = kwargs.get("stream", False)
2✔
251
    if is_stream:
2✔
252
        return Stream(response)
2✔
253
    return _anthropic_chat_wrapper(response, request_latency)
2✔
254

255

256
async def _anthropic_async_chat_wrapper(
2✔
257
    response: Message, request_latency: float
258
) -> Message:
259
    model_name = response.model
2✔
260
    scope3_row = ImpactRow(
2✔
261
        model=Model(id=model_name),
262
        input_tokens=response.usage.input_tokens,
263
        output_tokens=response.usage.output_tokens,
264
        request_duration_ms=request_latency * 1000,
265
        managed_service_id=PROVIDER,
266
    )
267
    scope3ai_ctx = Scope3AI.get_instance().submit_impact(scope3_row)
2✔
268
    if scope3ai_ctx is not None:
2✔
269
        return Message(**response.model_dump(), scope3ai=scope3ai_ctx)
2✔
270
    else:
271
        return response
×
272

273

274
async def anthropic_async_chat_wrapper(
2✔
275
    wrapped: Callable,
276
    instance: AsyncAnthropic,
277
    args: Any,
278
    kwargs: Any,  # noqa: ARG001
279
) -> Union[Message, AsyncStream[RawMessageStreamEvent]]:
280
    timer_start = time.perf_counter()
2✔
281
    response = await wrapped(*args, **kwargs)
2✔
282
    request_latency = time.perf_counter() - timer_start
2✔
283

284
    is_stream = kwargs.get("stream", False)
2✔
285
    if is_stream:
2✔
286
        return AsyncStream(response)
2✔
287
    return await _anthropic_async_chat_wrapper(response, request_latency)
2✔
288

289

290
def anthropic_stream_chat_wrapper(
2✔
291
    wrapped: Callable,
292
    instance: Anthropic,
293
    args: Any,
294
    kwargs: Any,  # noqa: ARG001
295
) -> MessageStreamManager:
296
    response = wrapped(*args, **kwargs)
2✔
297
    return MessageStreamManager(response._MessageStreamManager__api_request)  # noqa: SLF001
2✔
298

299

300
def anthropic_async_stream_chat_wrapper(
2✔
301
    wrapped: Callable,
302
    instance: AsyncAnthropic,
303
    args: Any,
304
    kwargs: Any,  # noqa: ARG001
305
) -> AsyncMessageStreamManager:
306
    response = wrapped(*args, **kwargs)
2✔
307
    return AsyncMessageStreamManager(response._AsyncMessageStreamManager__api_request)  # noqa: SLF001
2✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc