• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

scope3data / scope3ai-py / 12920205472

23 Jan 2025 01:12AM UTC coverage: 95.993% (+15.4%) from 80.557%
12920205472

Pull #74

github

9af6ef
tito
fix: try fixing test again
Pull Request #74: feat(metadata): include many metadata accessible at global or tracer level

49 of 50 new or added lines in 4 files covered. (98.0%)

53 existing lines in 10 files now uncovered.

2204 of 2296 relevant lines covered (95.99%)

3.84 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.76
/scope3ai/tracers/litellm/chat.py
1
import logging
4✔
2
import time
4✔
3
from typing import Any, Callable, Optional, Union
4✔
4

5
from litellm import AsyncCompletions, Completions
4✔
6
from litellm.types.utils import ModelResponse
4✔
7
from litellm.utils import CustomStreamWrapper
4✔
8

9
from scope3ai import Scope3AI
4✔
10
from scope3ai.api.types import Scope3AIContext, ImpactRow
4✔
11
from scope3ai.constants import PROVIDERS
4✔
12
from scope3ai.tracers.utils.multimodal import aggregate_multimodal
4✔
13

14
PROVIDER = PROVIDERS.LITELLM.value
4✔
15

16
logger = logging.getLogger("scope3ai.tracers.litellm.chat")
4✔
17

18

19
class ChatCompletion(ModelResponse):
4✔
20
    scope3ai: Optional[Scope3AIContext] = None
4✔
21

22

23
class ChatCompletionChunk(ModelResponse):
4✔
24
    scope3ai: Optional[Scope3AIContext] = None
4✔
25

26

27
def litellm_chat_wrapper(
4✔
28
    wrapped: Callable, instance: Completions, args: Any, kwargs: Any
29
) -> Union[ChatCompletion, CustomStreamWrapper]:
30
    if kwargs.get("stream", False):
4✔
31
        return litellm_chat_wrapper_stream(wrapped, instance, args, kwargs)
4✔
32
    else:
33
        return litellm_chat_wrapper_non_stream(wrapped, instance, args, kwargs)
4✔
34

35

36
def litellm_chat_wrapper_stream(  # type: ignore[misc]
4✔
37
    wrapped: Callable,
38
    instance: Completions,  # noqa: ARG001
39
    args: Any,
40
    kwargs: Any,
41
) -> CustomStreamWrapper:
42
    timer_start = time.perf_counter()
4✔
43
    stream = wrapped(*args, **kwargs)
4✔
44
    token_count = 0
4✔
45
    for i, chunk in enumerate(stream):
4✔
46
        if i > 0 and chunk.choices[0].finish_reason is None:
4✔
47
            token_count += 1
4✔
48
        request_latency = time.perf_counter() - timer_start
4✔
49

50
        model = chunk.model
4✔
51
        if model is not None:
4✔
52
            scope3_row = ImpactRow(
4✔
53
                model_id=model,
54
                output_tokens=token_count,
55
                request_duration_ms=float(request_latency) * 1000,
56
                managed_service_id=PROVIDER,
57
            )
58
            scope3ai_ctx = Scope3AI.get_instance().submit_impact(scope3_row)
4✔
59
            if scope3ai_ctx is not None:
4✔
60
                yield ChatCompletionChunk(**chunk.model_dump(), scope3ai=scope3ai_ctx)
4✔
61
            else:
UNCOV
62
                yield chunk
×
63
        else:
UNCOV
64
            yield chunk
×
65

66

67
def litellm_chat_wrapper_non_stream(
4✔
68
    wrapped: Callable,
69
    instance: Completions,  # noqa: ARG001
70
    args: Any,
71
    kwargs: Any,
72
) -> ChatCompletion:
73
    timer_start = time.perf_counter()
4✔
74
    with Scope3AI.get_instance().trace(keep_traces=True) as trace:
4✔
75
        response = wrapped(*args, **kwargs)
4✔
76
        if trace.traces:
4✔
UNCOV
77
            setattr(response, "scope3ai", trace.traces[0])
×
UNCOV
78
            return response
×
79
    request_latency = time.perf_counter() - timer_start
4✔
80
    model = response.model
4✔
81
    if model is None:
4✔
UNCOV
82
        return response
×
83
    scope3_row = ImpactRow(
4✔
84
        model_id=model,
85
        input_tokens=response.usage.prompt_tokens,
86
        output_tokens=response.usage.total_tokens,
87
        request_duration_ms=float(request_latency) * 1000,
88
        managed_service_id=PROVIDER,
89
    )
90
    messages = args[1] if len(args) > 1 else kwargs.get("messages")
4✔
91
    for message in messages:
4✔
92
        aggregate_multimodal(message, scope3_row, logger)
4✔
93
    scope3ai_ctx = Scope3AI.get_instance().submit_impact(scope3_row)
4✔
94
    if scope3ai_ctx is not None:
4✔
95
        return ChatCompletion(**response.model_dump(), scope3ai=scope3ai_ctx)
4✔
96
    else:
UNCOV
97
        return response
×
98

99

100
async def litellm_async_chat_wrapper(
4✔
101
    wrapped: Callable, instance: AsyncCompletions, args: Any, kwargs: Any
102
) -> Union[ChatCompletion, CustomStreamWrapper]:
103
    if kwargs.get("stream", False):
4✔
104
        return litellm_async_chat_wrapper_stream(wrapped, instance, args, kwargs)
4✔
105
    else:
106
        return await litellm_async_chat_wrapper_base(wrapped, instance, args, kwargs)
4✔
107

108

109
async def litellm_async_chat_wrapper_base(
4✔
110
    wrapped: Callable,
111
    instance: AsyncCompletions,  # noqa: ARG001
112
    args: Any,
113
    kwargs: Any,
114
) -> ChatCompletion:
115
    timer_start = time.perf_counter()
4✔
116
    response = await wrapped(*args, **kwargs)
4✔
117
    request_latency = time.perf_counter() - timer_start
4✔
118
    model = response.model
4✔
119
    if model is None:
4✔
UNCOV
120
        return response
×
121
    scope3_row = ImpactRow(
4✔
122
        model_id=model,
123
        input_tokens=response.usage.prompt_tokens,
124
        output_tokens=response.usage.total_tokens,
125
        request_duration_ms=float(request_latency) * 1000,
126
        managed_service_id=PROVIDER,
127
    )
128
    scope3ai_ctx = await Scope3AI.get_instance().asubmit_impact(scope3_row)
4✔
129
    if scope3ai_ctx is not None:
4✔
130
        return ChatCompletion(**response.model_dump(), scope3ai=scope3ai_ctx)
4✔
131
    else:
UNCOV
132
        return response
×
133

134

135
async def litellm_async_chat_wrapper_stream(  # type: ignore[misc]
4✔
136
    wrapped: Callable,
137
    instance: AsyncCompletions,  # noqa: ARG001
138
    args: Any,
139
    kwargs: Any,
140
) -> CustomStreamWrapper:
141
    timer_start = time.perf_counter()
4✔
142
    stream = await wrapped(*args, **kwargs)
4✔
143
    i = 0
4✔
144
    token_count = 0
4✔
145
    async for chunk in stream:
4✔
146
        if i > 0 and chunk.choices[0].finish_reason is None:
4✔
147
            token_count += 1
4✔
148
        request_latency = time.perf_counter() - timer_start
4✔
149
        model = chunk.model
4✔
150
        if model is not None:
4✔
151
            scope3_row = ImpactRow(
4✔
152
                model_id=model,
153
                output_tokens=token_count,
154
                request_duration_ms=float(request_latency) * 1000,
155
                managed_service_id=PROVIDER,
156
            )
157
            scope3ai_ctx = await Scope3AI.get_instance().asubmit_impact(scope3_row)
4✔
158
            if scope3ai_ctx is not None:
4✔
159
                yield ChatCompletionChunk(**chunk.model_dump(), scope3ai=scope3ai_ctx)
4✔
160
            else:
UNCOV
161
                yield chunk
×
162
        else:
UNCOV
163
            yield chunk
×
164
        i += 1
4✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc