• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

scope3data / scope3ai-py / 13040707031

29 Jan 2025 09:23PM UTC coverage: 96.412% (+15.9%) from 80.557%
13040707031

Pull #84

github

24322d
kevdevg
feat: multimodal output for openain/litellm
Pull Request #84: feat: multimodal output audio for OpenAi and Litellm

33 of 34 new or added lines in 3 files covered. (97.06%)

54 existing lines in 10 files now uncovered.

2472 of 2564 relevant lines covered (96.41%)

3.85 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.29
/scope3ai/tracers/litellm/chat.py
1
import logging
4✔
2
import time
4✔
3
from typing import Any, Callable, Optional, Union
4✔
4

5
from litellm import AsyncCompletions, Completions
4✔
6
from litellm.types.utils import ModelResponse
4✔
7
from litellm.utils import CustomStreamWrapper
4✔
8

9
from scope3ai import Scope3AI
4✔
10
from scope3ai.api.types import Scope3AIContext, ImpactRow
4✔
11
from scope3ai.constants import PROVIDERS
4✔
12
from scope3ai.tracers.utils.multimodal import (
4✔
13
    aggregate_multimodal,
14
    aggregate_multimodal_audio_content_output,
15
)
16

17
PROVIDER = PROVIDERS.LITELLM.value
4✔
18

19
logger = logging.getLogger("scope3ai.tracers.litellm.chat")
4✔
20

21

22
class ChatCompletion(ModelResponse):
4✔
23
    scope3ai: Optional[Scope3AIContext] = None
4✔
24

25

26
class ChatCompletionChunk(ModelResponse):
4✔
27
    scope3ai: Optional[Scope3AIContext] = None
4✔
28

29

30
def litellm_chat_wrapper(
4✔
31
    wrapped: Callable, instance: Completions, args: Any, kwargs: Any
32
) -> Union[ChatCompletion, CustomStreamWrapper]:
33
    if kwargs.get("stream", False):
4✔
34
        return litellm_chat_wrapper_stream(wrapped, instance, args, kwargs)
4✔
35
    else:
36
        return litellm_chat_wrapper_non_stream(wrapped, instance, args, kwargs)
4✔
37

38

39
def litellm_chat_wrapper_stream(  # type: ignore[misc]
4✔
40
    wrapped: Callable,
41
    instance: Completions,  # noqa: ARG001
42
    args: Any,
43
    kwargs: Any,
44
) -> CustomStreamWrapper:
45
    timer_start = time.perf_counter()
4✔
46
    stream = wrapped(*args, **kwargs)
4✔
47
    token_count = 0
4✔
48
    for i, chunk in enumerate(stream):
4✔
49
        if i > 0 and chunk.choices[0].finish_reason is None:
4✔
50
            token_count += 1
4✔
51
        request_latency = time.perf_counter() - timer_start
4✔
52

53
        model = chunk.model
4✔
54
        if model is not None:
4✔
55
            scope3_row = ImpactRow(
4✔
56
                model_id=model,
57
                output_tokens=token_count,
58
                request_duration_ms=float(request_latency) * 1000,
59
                managed_service_id=PROVIDER,
60
            )
61
            scope3ai_ctx = Scope3AI.get_instance().submit_impact(scope3_row)
4✔
62
            if scope3ai_ctx is not None:
4✔
63
                yield ChatCompletionChunk(**chunk.model_dump(), scope3ai=scope3ai_ctx)
4✔
64
            else:
UNCOV
65
                yield chunk
×
66
        else:
UNCOV
67
            yield chunk
×
68

69

70
def litellm_chat_wrapper_non_stream(
4✔
71
    wrapped: Callable,
72
    instance: Completions,  # noqa: ARG001
73
    args: Any,
74
    kwargs: Any,
75
) -> ChatCompletion:
76
    timer_start = time.perf_counter()
4✔
77
    keep_traces = not kwargs.pop("use_always_litellm_tracer", False)
4✔
78
    modalities = kwargs.get("modalities", [])
4✔
79
    with Scope3AI.get_instance().trace(keep_traces=keep_traces) as tracer:
4✔
80
        response = wrapped(*args, **kwargs)
4✔
81
        if tracer.traces:
4✔
82
            setattr(response, "scope3ai", tracer.traces[0])
4✔
83
            return response
4✔
84
    request_latency = time.perf_counter() - timer_start
4✔
85
    model = response.model
4✔
86
    if model is None:
4✔
UNCOV
87
        return response
×
88
    scope3_row = ImpactRow(
4✔
89
        model_id=model,
90
        input_tokens=response.usage.prompt_tokens,
91
        output_tokens=response.usage.total_tokens,
92
        request_duration_ms=float(request_latency) * 1000,
93
        managed_service_id=PROVIDER,
94
    )
95
    if "audio" in modalities:
4✔
96
        audio_format = kwargs.get("audio", {}).get("format", "mp3")
4✔
97
        for choice in response.choices:
4✔
98
            audio_data = getattr(choice.message, "audio")
4✔
99
            if audio_data:
4✔
100
                audio_content = audio_data.data
4✔
101
                aggregate_multimodal_audio_content_output(
4✔
102
                    audio_content, audio_format, scope3_row
103
                )
104
    messages = args[1] if len(args) > 1 else kwargs.get("messages")
4✔
105
    for message in messages:
4✔
106
        aggregate_multimodal(message, scope3_row, logger)
4✔
107
    scope3ai_ctx = Scope3AI.get_instance().submit_impact(scope3_row)
4✔
108
    if scope3ai_ctx is not None:
4✔
109
        return ChatCompletion(**response.model_dump(), scope3ai=scope3ai_ctx)
4✔
110
    else:
UNCOV
111
        return response
×
112

113

114
async def litellm_async_chat_wrapper(
4✔
115
    wrapped: Callable, instance: AsyncCompletions, args: Any, kwargs: Any
116
) -> Union[ChatCompletion, CustomStreamWrapper]:
117
    if kwargs.get("stream", False):
4✔
118
        return litellm_async_chat_wrapper_stream(wrapped, instance, args, kwargs)
4✔
119
    else:
120
        return await litellm_async_chat_wrapper_base(wrapped, instance, args, kwargs)
4✔
121

122

123
async def litellm_async_chat_wrapper_base(
4✔
124
    wrapped: Callable,
125
    instance: AsyncCompletions,  # noqa: ARG001
126
    args: Any,
127
    kwargs: Any,
128
) -> ChatCompletion:
129
    timer_start = time.perf_counter()
4✔
130
    keep_traces = not kwargs.pop("use_always_litellm_tracer", False)
4✔
131
    with Scope3AI.get_instance().trace(keep_traces=keep_traces) as tracer:
4✔
132
        response = await wrapped(*args, **kwargs)
4✔
133
        if tracer.traces:
4✔
134
            setattr(response, "scope3ai", tracer.traces[0])
×
UNCOV
135
            return response
×
136
    request_latency = time.perf_counter() - timer_start
4✔
137
    model = response.model
4✔
138
    if model is None:
4✔
UNCOV
139
        return response
×
140
    scope3_row = ImpactRow(
4✔
141
        model_id=model,
142
        input_tokens=response.usage.prompt_tokens,
143
        output_tokens=response.usage.total_tokens,
144
        request_duration_ms=float(request_latency) * 1000,
145
        managed_service_id=PROVIDER,
146
    )
147
    scope3ai_ctx = await Scope3AI.get_instance().asubmit_impact(scope3_row)
4✔
148
    if scope3ai_ctx is not None:
4✔
149
        return ChatCompletion(**response.model_dump(), scope3ai=scope3ai_ctx)
4✔
150
    else:
UNCOV
151
        return response
×
152

153

154
async def litellm_async_chat_wrapper_stream(  # type: ignore[misc]
4✔
155
    wrapped: Callable,
156
    instance: AsyncCompletions,  # noqa: ARG001
157
    args: Any,
158
    kwargs: Any,
159
) -> CustomStreamWrapper:
160
    timer_start = time.perf_counter()
4✔
161
    stream = await wrapped(*args, **kwargs)
4✔
162
    i = 0
4✔
163
    token_count = 0
4✔
164
    async for chunk in stream:
4✔
165
        if i > 0 and chunk.choices[0].finish_reason is None:
4✔
166
            token_count += 1
4✔
167
        request_latency = time.perf_counter() - timer_start
4✔
168
        model = chunk.model
4✔
169
        if model is not None:
4✔
170
            scope3_row = ImpactRow(
4✔
171
                model_id=model,
172
                output_tokens=token_count,
173
                request_duration_ms=float(request_latency) * 1000,
174
                managed_service_id=PROVIDER,
175
            )
176
            scope3ai_ctx = await Scope3AI.get_instance().asubmit_impact(scope3_row)
4✔
177
            if scope3ai_ctx is not None:
4✔
178
                yield ChatCompletionChunk(**chunk.model_dump(), scope3ai=scope3ai_ctx)
4✔
179
            else:
UNCOV
180
                yield chunk
×
181
        else:
UNCOV
182
            yield chunk
×
183
        i += 1
4✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc