• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

scope3data / scope3ai-py / 12953930249

24 Jan 2025 04:50PM UTC coverage: 96.405% (+15.8%) from 80.557%
12953930249

Pull #78

github

89ada1
web-flow
Merge 1e5564797 into 0cfbba85d
Pull Request #78: feat(api): synchronize api, fixes pyright issues and api example

69 of 71 new or added lines in 2 files covered. (97.18%)

48 existing lines in 10 files now uncovered.

2440 of 2531 relevant lines covered (96.4%)

3.85 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.47
/scope3ai/tracers/litellm/chat.py
1
import logging
4✔
2
import time
4✔
3
from typing import Any, Callable, Optional, Union
4✔
4

5
from litellm import AsyncCompletions, Completions
4✔
6
from litellm.types.utils import ModelResponse
4✔
7
from litellm.utils import CustomStreamWrapper
4✔
8

9
from scope3ai import Scope3AI
4✔
10
from scope3ai.api.types import Scope3AIContext, ImpactRow
4✔
11
from scope3ai.constants import PROVIDERS
4✔
12
from scope3ai.tracers.utils.multimodal import aggregate_multimodal
4✔
13

14
PROVIDER = PROVIDERS.LITELLM.value
4✔
15

16
logger = logging.getLogger("scope3ai.tracers.litellm.chat")
4✔
17

18

19
class ChatCompletion(ModelResponse):
4✔
20
    scope3ai: Optional[Scope3AIContext] = None
4✔
21

22

23
class ChatCompletionChunk(ModelResponse):
4✔
24
    scope3ai: Optional[Scope3AIContext] = None
4✔
25

26

27
def litellm_chat_wrapper(
4✔
28
    wrapped: Callable, instance: Completions, args: Any, kwargs: Any
29
) -> Union[ChatCompletion, CustomStreamWrapper]:
30
    if kwargs.get("stream", False):
4✔
31
        return litellm_chat_wrapper_stream(wrapped, instance, args, kwargs)
4✔
32
    else:
33
        return litellm_chat_wrapper_non_stream(wrapped, instance, args, kwargs)
4✔
34

35

36
def litellm_chat_wrapper_stream(  # type: ignore[misc]
4✔
37
    wrapped: Callable,
38
    instance: Completions,  # noqa: ARG001
39
    args: Any,
40
    kwargs: Any,
41
) -> CustomStreamWrapper:
42
    timer_start = time.perf_counter()
4✔
43
    stream = wrapped(*args, **kwargs)
4✔
44
    token_count = 0
4✔
45
    for i, chunk in enumerate(stream):
4✔
46
        if i > 0 and chunk.choices[0].finish_reason is None:
4✔
47
            token_count += 1
4✔
48
        request_latency = time.perf_counter() - timer_start
4✔
49

50
        model = chunk.model
4✔
51
        if model is not None:
4✔
52
            scope3_row = ImpactRow(
4✔
53
                model_id=model,
54
                output_tokens=token_count,
55
                request_duration_ms=float(request_latency) * 1000,
56
                managed_service_id=PROVIDER,
57
            )
58
            scope3ai_ctx = Scope3AI.get_instance().submit_impact(scope3_row)
4✔
59
            if scope3ai_ctx is not None:
4✔
60
                yield ChatCompletionChunk(**chunk.model_dump(), scope3ai=scope3ai_ctx)
4✔
61
            else:
UNCOV
62
                yield chunk
×
63
        else:
UNCOV
64
            yield chunk
×
65

66

67
def litellm_chat_wrapper_non_stream(
4✔
68
    wrapped: Callable,
69
    instance: Completions,  # noqa: ARG001
70
    args: Any,
71
    kwargs: Any,
72
) -> ChatCompletion:
73
    timer_start = time.perf_counter()
4✔
74
    keep_traces = not kwargs.pop("use_always_litellm_tracer", False)
4✔
75
    with Scope3AI.get_instance().trace(keep_traces=keep_traces) as tracer:
4✔
76
        response = wrapped(*args, **kwargs)
4✔
77
        if tracer.traces:
4✔
78
            setattr(response, "scope3ai", tracer.traces[0])
4✔
79
            return response
4✔
80
    request_latency = time.perf_counter() - timer_start
4✔
81
    model = response.model
4✔
82
    if model is None:
4✔
UNCOV
83
        return response
×
84
    scope3_row = ImpactRow(
4✔
85
        model_id=model,
86
        input_tokens=response.usage.prompt_tokens,
87
        output_tokens=response.usage.total_tokens,
88
        request_duration_ms=float(request_latency) * 1000,
89
        managed_service_id=PROVIDER,
90
    )
91
    messages = args[1] if len(args) > 1 else kwargs.get("messages")
4✔
92
    for message in messages:
4✔
93
        aggregate_multimodal(message, scope3_row, logger)
4✔
94
    scope3ai_ctx = Scope3AI.get_instance().submit_impact(scope3_row)
4✔
95
    if scope3ai_ctx is not None:
4✔
96
        return ChatCompletion(**response.model_dump(), scope3ai=scope3ai_ctx)
4✔
97
    else:
UNCOV
98
        return response
×
99

100

101
async def litellm_async_chat_wrapper(
4✔
102
    wrapped: Callable, instance: AsyncCompletions, args: Any, kwargs: Any
103
) -> Union[ChatCompletion, CustomStreamWrapper]:
104
    if kwargs.get("stream", False):
4✔
105
        return litellm_async_chat_wrapper_stream(wrapped, instance, args, kwargs)
4✔
106
    else:
107
        return await litellm_async_chat_wrapper_base(wrapped, instance, args, kwargs)
4✔
108

109

110
async def litellm_async_chat_wrapper_base(
4✔
111
    wrapped: Callable,
112
    instance: AsyncCompletions,  # noqa: ARG001
113
    args: Any,
114
    kwargs: Any,
115
) -> ChatCompletion:
116
    timer_start = time.perf_counter()
4✔
117
    keep_traces = not kwargs.pop("use_always_litellm_tracer", False)
4✔
118
    with Scope3AI.get_instance().trace(keep_traces=keep_traces) as tracer:
4✔
119
        response = await wrapped(*args, **kwargs)
4✔
120
        if tracer.traces:
4✔
121
            setattr(response, "scope3ai", tracer.traces[0])
×
UNCOV
122
            return response
×
123
    request_latency = time.perf_counter() - timer_start
4✔
124
    model = response.model
4✔
125
    if model is None:
4✔
UNCOV
126
        return response
×
127
    scope3_row = ImpactRow(
4✔
128
        model_id=model,
129
        input_tokens=response.usage.prompt_tokens,
130
        output_tokens=response.usage.total_tokens,
131
        request_duration_ms=float(request_latency) * 1000,
132
        managed_service_id=PROVIDER,
133
    )
134
    scope3ai_ctx = await Scope3AI.get_instance().asubmit_impact(scope3_row)
4✔
135
    if scope3ai_ctx is not None:
4✔
136
        return ChatCompletion(**response.model_dump(), scope3ai=scope3ai_ctx)
4✔
137
    else:
UNCOV
138
        return response
×
139

140

141
async def litellm_async_chat_wrapper_stream(  # type: ignore[misc]
4✔
142
    wrapped: Callable,
143
    instance: AsyncCompletions,  # noqa: ARG001
144
    args: Any,
145
    kwargs: Any,
146
) -> CustomStreamWrapper:
147
    timer_start = time.perf_counter()
4✔
148
    stream = await wrapped(*args, **kwargs)
4✔
149
    i = 0
4✔
150
    token_count = 0
4✔
151
    async for chunk in stream:
4✔
152
        if i > 0 and chunk.choices[0].finish_reason is None:
4✔
153
            token_count += 1
4✔
154
        request_latency = time.perf_counter() - timer_start
4✔
155
        model = chunk.model
4✔
156
        if model is not None:
4✔
157
            scope3_row = ImpactRow(
4✔
158
                model_id=model,
159
                output_tokens=token_count,
160
                request_duration_ms=float(request_latency) * 1000,
161
                managed_service_id=PROVIDER,
162
            )
163
            scope3ai_ctx = await Scope3AI.get_instance().asubmit_impact(scope3_row)
4✔
164
            if scope3ai_ctx is not None:
4✔
165
                yield ChatCompletionChunk(**chunk.model_dump(), scope3ai=scope3ai_ctx)
4✔
166
            else:
UNCOV
167
                yield chunk
×
168
        else:
UNCOV
169
            yield chunk
×
170
        i += 1
4✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc