• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

scope3data / scope3ai-py / 13416015178

19 Feb 2025 03:18PM UTC coverage: 96.179% (+15.6%) from 80.557%
13416015178

Pull #91

github

404fae
web-flow
Merge b16436a44 into 37d564f57
Pull Request #91: docs: minor readme edits

2542 of 2643 relevant lines covered (96.18%)

3.84 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.28
/scope3ai/tracers/litellm/chat.py
1
import logging
4✔
2
import time
4✔
3
from typing import Any, Callable, Optional, Union
4✔
4

5
from litellm import AsyncCompletions, Completions
4✔
6
from litellm.types.utils import ModelResponse
4✔
7
from litellm.utils import CustomStreamWrapper
4✔
8
import tiktoken
4✔
9

10
from scope3ai import Scope3AI
4✔
11
from scope3ai.api.types import Scope3AIContext, ImpactRow
4✔
12
from scope3ai.tracers.utils.multimodal import (
4✔
13
    aggregate_multimodal,
14
    aggregate_multimodal_audio_content_output,
15
)
16

17

18
logger = logging.getLogger("scope3ai.tracers.litellm.chat")
4✔
19

20

21
class ChatCompletion(ModelResponse):
4✔
22
    scope3ai: Optional[Scope3AIContext] = None
4✔
23

24

25
class ChatCompletionChunk(ModelResponse):
4✔
26
    scope3ai: Optional[Scope3AIContext] = None
4✔
27

28

29
def litellm_chat_wrapper(
4✔
30
    wrapped: Callable, instance: Completions, args: Any, kwargs: Any
31
) -> Union[ChatCompletion, CustomStreamWrapper]:
32
    if kwargs.get("stream", False):
4✔
33
        return litellm_chat_wrapper_stream(wrapped, instance, args, kwargs)
4✔
34
    else:
35
        return litellm_chat_wrapper_non_stream(wrapped, instance, args, kwargs)
4✔
36

37

38
def litellm_chat_wrapper_stream(  # type: ignore[misc]
4✔
39
    wrapped: Callable,
40
    instance: Completions,  # noqa: ARG001
41
    args: Any,
42
    kwargs: Any,
43
) -> CustomStreamWrapper:
44
    timer_start = time.perf_counter()
4✔
45
    token_count = 0
4✔
46
    keep_traces = not kwargs.pop("use_always_litellm_tracer", False)
4✔
47
    with Scope3AI.get_instance().trace(keep_traces=keep_traces):
4✔
48
        stream = wrapped(*args, **kwargs)
4✔
49
    for i, chunk in enumerate(stream):
4✔
50
        if i > 0:
4✔
51
            token_count += 1
4✔
52
        if chunk.choices[0].finish_reason is None:
4✔
53
            yield chunk
4✔
54
            continue
4✔
55
        request_latency = time.perf_counter() - timer_start
4✔
56
        model = args[0] if len(args) > 0 else kwargs.get("model")
4✔
57
        messages = args[1] if len(args) > 1 else kwargs.get("messages")
4✔
58
        prompt = " ".join([message.get("content") for message in messages])
4✔
59
        encoder = tiktoken.get_encoding("cl100k_base")
4✔
60
        input_tokens = len(encoder.encode(prompt))
4✔
61
        if model is None:
4✔
62
            model = chunk.model
×
63
        scope3_row = ImpactRow(
4✔
64
            model_id=model,
65
            input_tokens=input_tokens,
66
            output_tokens=token_count,
67
            request_duration_ms=float(request_latency) * 1000,
68
        )
69
        scope3ai_ctx = Scope3AI.get_instance().submit_impact(scope3_row)
4✔
70
        yield ChatCompletionChunk(**chunk.model_dump(), scope3ai=scope3ai_ctx)
4✔
71

72

73
def litellm_chat_wrapper_non_stream(
4✔
74
    wrapped: Callable,
75
    instance: Completions,  # noqa: ARG001
76
    args: Any,
77
    kwargs: Any,
78
) -> ChatCompletion:
79
    timer_start = time.perf_counter()
4✔
80
    keep_traces = not kwargs.pop("use_always_litellm_tracer", False)
4✔
81
    modalities = kwargs.get("modalities", [])
4✔
82
    with Scope3AI.get_instance().trace(keep_traces=keep_traces) as tracer:
4✔
83
        response = wrapped(*args, **kwargs)
4✔
84
        if tracer.traces:
4✔
85
            setattr(response, "scope3ai", tracer.traces[0])
4✔
86
            return response
4✔
87
    request_latency = time.perf_counter() - timer_start
4✔
88
    model = args[0] if len(args) > 0 else kwargs.get("model")
4✔
89
    if model is None:
4✔
90
        model = response.model
×
91
    scope3_row = ImpactRow(
4✔
92
        model_id=model,
93
        input_tokens=response.usage.prompt_tokens,
94
        output_tokens=response.usage.total_tokens,
95
        request_duration_ms=float(request_latency) * 1000,
96
    )
97
    if "audio" in modalities:
4✔
98
        audio_format = kwargs.get("audio", {}).get("format", "mp3")
4✔
99
        for choice in response.choices:
4✔
100
            audio_data = getattr(choice.message, "audio")
4✔
101
            if audio_data:
4✔
102
                audio_content = audio_data.data
4✔
103
                aggregate_multimodal_audio_content_output(
4✔
104
                    audio_content, audio_format, scope3_row
105
                )
106
    messages = args[1] if len(args) > 1 else kwargs.get("messages")
4✔
107
    for message in messages:
4✔
108
        aggregate_multimodal(message, scope3_row, logger)
4✔
109
    scope3ai_ctx = Scope3AI.get_instance().submit_impact(scope3_row)
4✔
110
    if scope3ai_ctx is not None:
4✔
111
        return ChatCompletion(**response.model_dump(), scope3ai=scope3ai_ctx)
4✔
112
    else:
113
        return response
×
114

115

116
async def litellm_async_chat_wrapper(
4✔
117
    wrapped: Callable, instance: AsyncCompletions, args: Any, kwargs: Any
118
) -> Union[ChatCompletion, CustomStreamWrapper]:
119
    if kwargs.get("stream", False):
4✔
120
        return litellm_async_chat_wrapper_stream(wrapped, instance, args, kwargs)
4✔
121
    else:
122
        return await litellm_async_chat_wrapper_base(wrapped, instance, args, kwargs)
4✔
123

124

125
async def litellm_async_chat_wrapper_base(
4✔
126
    wrapped: Callable,
127
    instance: AsyncCompletions,  # noqa: ARG001
128
    args: Any,
129
    kwargs: Any,
130
) -> ChatCompletion:
131
    timer_start = time.perf_counter()
4✔
132
    keep_traces = not kwargs.pop("use_always_litellm_tracer", False)
4✔
133
    modalities = kwargs.get("modalities", [])
4✔
134
    with Scope3AI.get_instance().trace(keep_traces=keep_traces) as tracer:
4✔
135
        response = await wrapped(*args, **kwargs)
4✔
136
        if tracer.traces:
4✔
137
            setattr(response, "scope3ai", tracer.traces[0])
4✔
138
            return response
4✔
139
    request_latency = time.perf_counter() - timer_start
4✔
140
    model = args[0] if len(args) > 0 else kwargs.get("model")
4✔
141
    if model is None:
4✔
142
        model = response.model
×
143
    scope3_row = ImpactRow(
4✔
144
        model_id=model,
145
        input_tokens=response.usage.prompt_tokens,
146
        output_tokens=response.usage.total_tokens,
147
        request_duration_ms=float(request_latency) * 1000,
148
    )
149
    if "audio" in modalities:
4✔
150
        audio_format = kwargs.get("audio", {}).get("format", "mp3")
4✔
151
        for choice in response.choices:
4✔
152
            audio_data = getattr(choice.message, "audio")
4✔
153
            if audio_data:
4✔
154
                audio_content = audio_data.data
4✔
155
                aggregate_multimodal_audio_content_output(
4✔
156
                    audio_content, audio_format, scope3_row
157
                )
158
    messages = args[1] if len(args) > 1 else kwargs.get("messages")
4✔
159
    for message in messages:
4✔
160
        aggregate_multimodal(message, scope3_row, logger)
4✔
161
    scope3ai_ctx = await Scope3AI.get_instance().asubmit_impact(scope3_row)
4✔
162
    if scope3ai_ctx is not None:
4✔
163
        return ChatCompletion(**response.model_dump(), scope3ai=scope3ai_ctx)
4✔
164
    else:
165
        return response
×
166

167

168
async def litellm_async_chat_wrapper_stream(  # type: ignore[misc]
4✔
169
    wrapped: Callable,
170
    instance: AsyncCompletions,  # noqa: ARG001
171
    args: Any,
172
    kwargs: Any,
173
) -> CustomStreamWrapper:
174
    timer_start = time.perf_counter()
4✔
175
    keep_traces = not kwargs.pop("use_always_litellm_tracer", False)
4✔
176
    with Scope3AI.get_instance().trace(keep_traces=keep_traces):
4✔
177
        stream = await wrapped(*args, **kwargs)
4✔
178
    i = 0
4✔
179
    token_count = 0
4✔
180
    async for chunk in stream:
4✔
181
        if i > 0:
4✔
182
            token_count += 1
4✔
183
        if chunk.choices[0].finish_reason is None:
4✔
184
            i += 1
4✔
185
            yield chunk
4✔
186
            continue
4✔
187
        request_latency = time.perf_counter() - timer_start
4✔
188
        model = args[0] if len(args) > 0 else kwargs.get("model")
4✔
189
        messages = args[1] if len(args) > 1 else kwargs.get("messages")
4✔
190
        prompt = " ".join([message.get("content") for message in messages])
4✔
191
        encoder = tiktoken.get_encoding("cl100k_base")
4✔
192
        input_tokens = len(encoder.encode(prompt))
4✔
193
        if model is None:
4✔
194
            model = chunk.model
×
195
        scope3_row = ImpactRow(
4✔
196
            model_id=model,
197
            input_tokens=input_tokens,
198
            output_tokens=token_count,
199
            request_duration_ms=float(request_latency) * 1000,
200
        )
201
        scope3ai_ctx = await Scope3AI.get_instance().asubmit_impact(scope3_row)
4✔
202
        yield ChatCompletionChunk(**chunk.model_dump(), scope3ai=scope3ai_ctx)
4✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc