• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

scope3data / scope3ai-py / 12681022279

08 Jan 2025 11:43PM UTC coverage: 95.11% (+14.6%) from 80.557%
12681022279

Pull #56

github

da957e
kevdevg
fix: add try catch to pillow images
Pull Request #56: feat(huggingface): add support for image-to-image/ text-to-speech

155 of 169 new or added lines in 7 files covered. (91.72%)

26 existing lines in 4 files now uncovered.

1770 of 1861 relevant lines covered (95.11%)

3.8 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.82
/scope3ai/tracers/huggingface/chat.py
1
import time
4✔
2
from collections.abc import AsyncIterable, Iterable
4✔
3
from dataclasses import asdict, dataclass
4✔
4
from typing import Any, Callable, Optional, Union
4✔
5

6
import tiktoken
4✔
7
from huggingface_hub import AsyncInferenceClient, InferenceClient  # type: ignore[import-untyped]
4✔
8
from huggingface_hub import ChatCompletionOutput as _ChatCompletionOutput
4✔
9
from huggingface_hub import ChatCompletionStreamOutput as _ChatCompletionStreamOutput
4✔
10
from requests import Response
4✔
11

12
from scope3ai.api.types import Scope3AIContext, Model, ImpactRow
4✔
13
from scope3ai.constants import PROVIDERS
4✔
14
from scope3ai.lib import Scope3AI
4✔
15
from scope3ai.response_interceptor.requests_interceptor import requests_response_capture
4✔
16

17
PROVIDER = PROVIDERS.HUGGINGFACE_HUB.value
4✔
18

19

20
@dataclass
4✔
21
class ChatCompletionOutput(_ChatCompletionOutput):
4✔
22
    scope3ai: Optional[Scope3AIContext] = None
4✔
23

24

25
@dataclass
4✔
26
class ChatCompletionStreamOutput(_ChatCompletionStreamOutput):
4✔
27
    scope3ai: Optional[Scope3AIContext] = None
4✔
28

29

30
def huggingface_chat_wrapper(
4✔
31
    wrapped: Callable, instance: InferenceClient, args: Any, kwargs: Any
32
) -> Union[ChatCompletionOutput, Iterable[ChatCompletionStreamOutput]]:
33
    if kwargs.get("stream", False):
4✔
34
        return huggingface_chat_wrapper_stream(wrapped, instance, args, kwargs)
4✔
35
    else:
36
        return huggingface_chat_wrapper_non_stream(wrapped, instance, args, kwargs)
4✔
37

38

39
def huggingface_chat_wrapper_non_stream(
4✔
40
    wrapped: Callable, instance: InferenceClient, args: Any, kwargs: Any
41
) -> ChatCompletionOutput:
42
    timer_start = time.perf_counter()
4✔
43
    http_response: Response | None = None
4✔
44
    with requests_response_capture() as responses:
4✔
45
        response = wrapped(*args, **kwargs)
4✔
46
        http_responses = responses.get()
4✔
47
        if len(http_responses) > 0:
4✔
48
            http_response = http_responses[0]
4✔
49
    model = (
4✔
50
        instance.model or kwargs.get("model") or instance.get_recommended_model("chat")
51
    )
52
    if http_response:
4✔
53
        compute_time = http_response.headers.get("x-compute-time")
4✔
54
    else:
NEW
55
        compute_time = time.perf_counter() - timer_start
×
56
    scope3_row = ImpactRow(
4✔
57
        model=Model(id=model),
58
        input_tokens=response.usage.prompt_tokens,
59
        output_tokens=response.usage.completion_tokens,
60
        request_duration_ms=float(compute_time) * 1000,
61
        managed_service_id=PROVIDER,
62
    )
63
    scope3ai_ctx = Scope3AI.get_instance().submit_impact(scope3_row)
4✔
64
    chat = ChatCompletionOutput(**asdict(response))
4✔
65
    chat.scope3ai = scope3ai_ctx
4✔
66
    return chat
4✔
67

68

69
def huggingface_chat_wrapper_stream(
4✔
70
    wrapped: Callable, instance: InferenceClient, args: Any, kwargs: Any
71
) -> Iterable[ChatCompletionStreamOutput]:
72
    timer_start = time.perf_counter()
4✔
73
    stream = wrapped(*args, **kwargs)
4✔
74
    token_count = 0
4✔
75
    model = (
4✔
76
        instance.model or kwargs.get("model") or instance.get_recommended_model("chat")
77
    )
78
    for chunk in stream:
4✔
79
        token_count += 1
4✔
80
        request_latency = time.perf_counter() - timer_start
4✔
81
        scope3_row = ImpactRow(
4✔
82
            model=Model(id=model),
83
            output_tokens=token_count,
84
            request_duration_ms=request_latency * 1000,
85
            managed_service_id=PROVIDER,
86
        )
87
        chunk_data = ChatCompletionStreamOutput(**asdict(chunk))
4✔
88
        scope3_ctx = Scope3AI.get_instance().submit_impact(scope3_row)
4✔
89
        if scope3_ctx is not None:
4✔
90
            chunk_data.scope3ai = scope3_ctx
4✔
91
        yield chunk_data
4✔
92

93

94
async def huggingface_async_chat_wrapper(
4✔
95
    wrapped: Callable, instance: AsyncInferenceClient, args: Any, kwargs: Any
96
) -> Union[ChatCompletionOutput, AsyncIterable[ChatCompletionStreamOutput]]:
97
    if kwargs.get("stream", False):
4✔
98
        return huggingface_async_chat_wrapper_stream(wrapped, instance, args, kwargs)
4✔
99
    else:
100
        return await huggingface_async_chat_wrapper_non_stream(
4✔
101
            wrapped, instance, args, kwargs
102
        )
103

104

105
async def huggingface_async_chat_wrapper_non_stream(
4✔
106
    wrapped: Callable, instance: AsyncInferenceClient, args: Any, kwargs: Any
107
) -> ChatCompletionOutput:
108
    timer_start = time.perf_counter()
4✔
109

110
    response = await wrapped(*args, **kwargs)
4✔
111
    request_latency = time.perf_counter() - timer_start
4✔
112
    model = (
4✔
113
        instance.model or kwargs.get("model") or instance.get_recommended_model("chat")
114
    )
115
    encoder = tiktoken.get_encoding("cl100k_base")
4✔
116
    output_tokens = len(encoder.encode(response.choices[0].message.content))
4✔
117
    scope3_row = ImpactRow(
4✔
118
        model=Model(id=model),
119
        input_tokens=response.usage.prompt_tokens,
120
        output_tokens=output_tokens,
121
        request_duration_ms=request_latency * 1000,
122
        managed_service_id=PROVIDER,
123
    )
124

125
    scope3ai_ctx = Scope3AI.get_instance().submit_impact(scope3_row)
4✔
126
    chat = ChatCompletionOutput(**asdict(response))
4✔
127
    chat.scope3ai = scope3ai_ctx
4✔
128
    return chat
4✔
129

130

131
# Todo: How headers works for stream
132
async def huggingface_async_chat_wrapper_stream(
4✔
133
    wrapped: Callable, instance: AsyncInferenceClient, args: Any, kwargs: Any
134
) -> AsyncIterable[ChatCompletionStreamOutput]:
135
    timer_start = time.perf_counter()
4✔
136
    stream = await wrapped(*args, **kwargs)
4✔
137
    token_count = 0
4✔
138
    model_used = instance.model or kwargs["model"]
4✔
139
    async for chunk in stream:
4✔
140
        token_count += 1
4✔
141
        request_latency = time.perf_counter() - timer_start
4✔
142
        scope3_row = ImpactRow(
4✔
143
            model=Model(id=model_used),
144
            output_tokens=token_count,
145
            request_duration_ms=request_latency
146
            * 1000,  # TODO: can we get the header that has the processing time
147
            managed_service_id=PROVIDER,
148
        )
149
        scope3_ctx = Scope3AI.get_instance().submit_impact(scope3_row)
4✔
150
        chunk_data = ChatCompletionStreamOutput(**asdict(chunk))
4✔
151
        if scope3_ctx is not None:
4✔
152
            chunk_data.scope3ai = scope3_ctx
4✔
153
        yield chunk_data
4✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc