• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

scope3data / scope3ai-py / 12753874046

13 Jan 2025 06:40PM UTC coverage: 95.076% (+14.5%) from 80.557%
12753874046

Pull #61

github

3a8d3f
kevdevg
fix: vision pillow read bytes
Pull Request #61: feat(Hugging face): Vision methods - image classification / image segmentation / object detection

179 of 189 new or added lines in 5 files covered. (94.71%)

34 existing lines in 9 files now uncovered.

2008 of 2112 relevant lines covered (95.08%)

3.8 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.84
/scope3ai/tracers/huggingface/chat.py
1
import time
4✔
2
from collections.abc import AsyncIterable, Iterable
4✔
3
from dataclasses import asdict, dataclass
4✔
4
from typing import Any, Callable, Optional, Union
4✔
5

6
import tiktoken
4✔
7
from huggingface_hub import AsyncInferenceClient, InferenceClient  # type: ignore[import-untyped]
4✔
8
from huggingface_hub import ChatCompletionOutput as _ChatCompletionOutput
4✔
9
from huggingface_hub import ChatCompletionStreamOutput as _ChatCompletionStreamOutput
4✔
10
from requests import Response
4✔
11

12
from scope3ai.api.types import Scope3AIContext, Model, ImpactRow
4✔
13
from scope3ai.constants import PROVIDERS
4✔
14
from scope3ai.lib import Scope3AI
4✔
15
from scope3ai.response_interceptor.requests_interceptor import requests_response_capture
4✔
16

17
PROVIDER = PROVIDERS.HUGGINGFACE_HUB.value
4✔
18
HUGGING_FACE_CHAT_TASK = "chat"
4✔
19

20

21
@dataclass
4✔
22
class ChatCompletionOutput(_ChatCompletionOutput):
4✔
23
    scope3ai: Optional[Scope3AIContext] = None
4✔
24

25

26
@dataclass
4✔
27
class ChatCompletionStreamOutput(_ChatCompletionStreamOutput):
4✔
28
    scope3ai: Optional[Scope3AIContext] = None
4✔
29

30

31
def huggingface_chat_wrapper(
4✔
32
    wrapped: Callable, instance: InferenceClient, args: Any, kwargs: Any
33
) -> Union[ChatCompletionOutput, Iterable[ChatCompletionStreamOutput]]:
34
    if kwargs.get("stream", False):
4✔
35
        return huggingface_chat_wrapper_stream(wrapped, instance, args, kwargs)
4✔
36
    else:
37
        return huggingface_chat_wrapper_non_stream(wrapped, instance, args, kwargs)
4✔
38

39

40
def huggingface_chat_wrapper_non_stream(
4✔
41
    wrapped: Callable, instance: InferenceClient, args: Any, kwargs: Any
42
) -> ChatCompletionOutput:
43
    timer_start = time.perf_counter()
4✔
44
    http_response: Response | None = None
4✔
45
    with requests_response_capture() as responses:
4✔
46
        response = wrapped(*args, **kwargs)
4✔
47
        http_responses = responses.get()
4✔
48
        if len(http_responses) > 0:
4✔
49
            http_response = http_responses[0]
4✔
50
    model = (
4✔
51
        instance.model
52
        or kwargs.get("model")
53
        or instance.get_recommended_model(HUGGING_FACE_CHAT_TASK)
54
    )
55
    if http_response:
4✔
56
        compute_time = http_response.headers.get("x-compute-time")
4✔
57
    else:
UNCOV
58
        compute_time = time.perf_counter() - timer_start
×
59
    scope3_row = ImpactRow(
4✔
60
        model=Model(id=model),
61
        input_tokens=response.usage.prompt_tokens,
62
        output_tokens=response.usage.completion_tokens,
63
        request_duration_ms=float(compute_time) * 1000,
64
        managed_service_id=PROVIDER,
65
    )
66
    scope3ai_ctx = Scope3AI.get_instance().submit_impact(scope3_row)
4✔
67
    chat = ChatCompletionOutput(**asdict(response))
4✔
68
    chat.scope3ai = scope3ai_ctx
4✔
69
    return chat
4✔
70

71

72
def huggingface_chat_wrapper_stream(
4✔
73
    wrapped: Callable, instance: InferenceClient, args: Any, kwargs: Any
74
) -> Iterable[ChatCompletionStreamOutput]:
75
    timer_start = time.perf_counter()
4✔
76
    stream = wrapped(*args, **kwargs)
4✔
77
    token_count = 0
4✔
78
    model = (
4✔
79
        instance.model
80
        or kwargs.get("model")
81
        or instance.get_recommended_model(HUGGING_FACE_CHAT_TASK)
82
    )
83
    for chunk in stream:
4✔
84
        token_count += 1
4✔
85
        request_latency = time.perf_counter() - timer_start
4✔
86
        scope3_row = ImpactRow(
4✔
87
            model=Model(id=model),
88
            output_tokens=token_count,
89
            request_duration_ms=request_latency * 1000,
90
            managed_service_id=PROVIDER,
91
        )
92
        chunk_data = ChatCompletionStreamOutput(**asdict(chunk))
4✔
93
        scope3_ctx = Scope3AI.get_instance().submit_impact(scope3_row)
4✔
94
        if scope3_ctx is not None:
4✔
95
            chunk_data.scope3ai = scope3_ctx
4✔
96
        yield chunk_data
4✔
97

98

99
async def huggingface_async_chat_wrapper(
4✔
100
    wrapped: Callable, instance: AsyncInferenceClient, args: Any, kwargs: Any
101
) -> Union[ChatCompletionOutput, AsyncIterable[ChatCompletionStreamOutput]]:
102
    if kwargs.get("stream", False):
4✔
103
        return huggingface_async_chat_wrapper_stream(wrapped, instance, args, kwargs)
4✔
104
    else:
105
        return await huggingface_async_chat_wrapper_non_stream(
4✔
106
            wrapped, instance, args, kwargs
107
        )
108

109

110
async def huggingface_async_chat_wrapper_non_stream(
4✔
111
    wrapped: Callable, instance: AsyncInferenceClient, args: Any, kwargs: Any
112
) -> ChatCompletionOutput:
113
    timer_start = time.perf_counter()
4✔
114

115
    response = await wrapped(*args, **kwargs)
4✔
116
    request_latency = time.perf_counter() - timer_start
4✔
117
    model = (
4✔
118
        instance.model or kwargs.get("model") or instance.get_recommended_model("chat")
119
    )
120
    encoder = tiktoken.get_encoding("cl100k_base")
4✔
121
    output_tokens = len(encoder.encode(response.choices[0].message.content))
4✔
122
    scope3_row = ImpactRow(
4✔
123
        model=Model(id=model),
124
        input_tokens=response.usage.prompt_tokens,
125
        output_tokens=output_tokens,
126
        request_duration_ms=request_latency * 1000,
127
        managed_service_id=PROVIDER,
128
    )
129

130
    scope3ai_ctx = Scope3AI.get_instance().submit_impact(scope3_row)
4✔
131
    chat = ChatCompletionOutput(**asdict(response))
4✔
132
    chat.scope3ai = scope3ai_ctx
4✔
133
    return chat
4✔
134

135

136
# Todo: How headers works for stream
137
async def huggingface_async_chat_wrapper_stream(
4✔
138
    wrapped: Callable, instance: AsyncInferenceClient, args: Any, kwargs: Any
139
) -> AsyncIterable[ChatCompletionStreamOutput]:
140
    timer_start = time.perf_counter()
4✔
141
    stream = await wrapped(*args, **kwargs)
4✔
142
    token_count = 0
4✔
143
    model_used = instance.model or kwargs["model"]
4✔
144
    async for chunk in stream:
4✔
145
        token_count += 1
4✔
146
        request_latency = time.perf_counter() - timer_start
4✔
147
        scope3_row = ImpactRow(
4✔
148
            model=Model(id=model_used),
149
            output_tokens=token_count,
150
            request_duration_ms=request_latency
151
            * 1000,  # TODO: can we get the header that has the processing time
152
            managed_service_id=PROVIDER,
153
        )
154
        scope3_ctx = Scope3AI.get_instance().submit_impact(scope3_row)
4✔
155
        chunk_data = ChatCompletionStreamOutput(**asdict(chunk))
4✔
156
        if scope3_ctx is not None:
4✔
157
            chunk_data.scope3ai = scope3_ctx
4✔
158
        yield chunk_data
4✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc