• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

scope3data / scope3ai-py / 13396554446

18 Feb 2025 05:28PM UTC coverage: 96.179% (+15.6%) from 80.557%
13396554446

Pull #91

github

45362b
web-flow
Merge a1470984a into 37d564f57
Pull Request #91: docs: minor readme edits

2542 of 2643 relevant lines covered (96.18%)

3.84 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.81
/scope3ai/tracers/huggingface/chat.py
1
import time
4✔
2
from collections.abc import AsyncIterable, Iterable
4✔
3
from dataclasses import asdict, dataclass
4✔
4
from typing import Any, Callable, Optional, Union
4✔
5

6
import tiktoken
4✔
7
from huggingface_hub import (  # type: ignore[import-untyped]
4✔
8
    AsyncInferenceClient,
9
    InferenceClient,
10
)
11
from huggingface_hub import ChatCompletionOutput as _ChatCompletionOutput
4✔
12
from huggingface_hub import ChatCompletionStreamOutput as _ChatCompletionStreamOutput
4✔
13
from requests import Response
4✔
14

15
from scope3ai.api.types import ImpactRow, Scope3AIContext
4✔
16
from scope3ai.lib import Scope3AI
4✔
17
from scope3ai.response_interceptor.requests_interceptor import requests_response_capture
4✔
18

19
HUGGING_FACE_CHAT_TASK = "chat"
4✔
20

21

22
@dataclass
4✔
23
class ChatCompletionOutput(_ChatCompletionOutput):
4✔
24
    scope3ai: Optional[Scope3AIContext] = None
4✔
25

26

27
@dataclass
4✔
28
class ChatCompletionStreamOutput(_ChatCompletionStreamOutput):
4✔
29
    scope3ai: Optional[Scope3AIContext] = None
4✔
30

31

32
def huggingface_chat_wrapper(
4✔
33
    wrapped: Callable, instance: InferenceClient, args: Any, kwargs: Any
34
) -> Union[ChatCompletionOutput, Iterable[ChatCompletionStreamOutput]]:
35
    if kwargs.get("stream", False):
4✔
36
        return huggingface_chat_wrapper_stream(wrapped, instance, args, kwargs)
4✔
37
    else:
38
        return huggingface_chat_wrapper_non_stream(wrapped, instance, args, kwargs)
4✔
39

40

41
def huggingface_chat_wrapper_non_stream(
4✔
42
    wrapped: Callable, instance: InferenceClient, args: Any, kwargs: Any
43
) -> ChatCompletionOutput:
44
    timer_start = time.perf_counter()
4✔
45
    http_response: Response | None = None
4✔
46
    with requests_response_capture() as responses:
4✔
47
        response = wrapped(*args, **kwargs)
4✔
48
        http_responses = responses.get()
4✔
49
        if http_responses:
4✔
50
            http_response = http_responses[0]
4✔
51
    model = (
4✔
52
        instance.model
53
        or kwargs.get("model")
54
        or instance.get_recommended_model(HUGGING_FACE_CHAT_TASK)
55
    )
56
    if http_response:
4✔
57
        compute_time = http_response.headers.get("x-compute-time")
4✔
58
    else:
59
        compute_time = time.perf_counter() - timer_start
×
60
    scope3_row = ImpactRow(
4✔
61
        model_id=model,
62
        input_tokens=response.usage.prompt_tokens,
63
        output_tokens=response.usage.completion_tokens,
64
        request_duration_ms=float(compute_time) * 1000,
65
    )
66
    scope3ai_ctx = Scope3AI.get_instance().submit_impact(scope3_row)
4✔
67
    chat = ChatCompletionOutput(**asdict(response))
4✔
68
    chat.scope3ai = scope3ai_ctx
4✔
69
    return chat
4✔
70

71

72
def huggingface_chat_wrapper_stream(
4✔
73
    wrapped: Callable, instance: InferenceClient, args: Any, kwargs: Any
74
) -> Iterable[ChatCompletionStreamOutput]:
75
    timer_start = time.perf_counter()
4✔
76
    stream = wrapped(*args, **kwargs)
4✔
77
    token_count = 0
4✔
78
    model = (
4✔
79
        instance.model
80
        or kwargs.get("model")
81
        or instance.get_recommended_model(HUGGING_FACE_CHAT_TASK)
82
    )
83
    for chunk in stream:
4✔
84
        token_count += 1
4✔
85
        request_latency = time.perf_counter() - timer_start
4✔
86
        scope3_row = ImpactRow(
4✔
87
            model_id=model,
88
            output_tokens=token_count,
89
            request_duration_ms=request_latency * 1000,
90
        )
91
        chunk_data = ChatCompletionStreamOutput(**asdict(chunk))
4✔
92
        scope3_ctx = Scope3AI.get_instance().submit_impact(scope3_row)
4✔
93
        if scope3_ctx is not None:
4✔
94
            chunk_data.scope3ai = scope3_ctx
4✔
95
        yield chunk_data
4✔
96

97

98
async def huggingface_async_chat_wrapper(
4✔
99
    wrapped: Callable, instance: AsyncInferenceClient, args: Any, kwargs: Any
100
) -> Union[ChatCompletionOutput, AsyncIterable[ChatCompletionStreamOutput]]:
101
    if kwargs.get("stream", False):
4✔
102
        return huggingface_async_chat_wrapper_stream(wrapped, instance, args, kwargs)
4✔
103
    else:
104
        return await huggingface_async_chat_wrapper_non_stream(
4✔
105
            wrapped, instance, args, kwargs
106
        )
107

108

109
async def huggingface_async_chat_wrapper_non_stream(
4✔
110
    wrapped: Callable, instance: AsyncInferenceClient, args: Any, kwargs: Any
111
) -> ChatCompletionOutput:
112
    timer_start = time.perf_counter()
4✔
113

114
    response = await wrapped(*args, **kwargs)
4✔
115
    request_latency = time.perf_counter() - timer_start
4✔
116
    model = (
4✔
117
        instance.model or kwargs.get("model") or instance.get_recommended_model("chat")
118
    )
119
    encoder = tiktoken.get_encoding("cl100k_base")
4✔
120
    output_tokens = len(encoder.encode(response.choices[0].message.content))
4✔
121
    scope3_row = ImpactRow(
4✔
122
        model_id=model,
123
        input_tokens=response.usage.prompt_tokens,
124
        output_tokens=output_tokens,
125
        request_duration_ms=request_latency * 1000,
126
    )
127

128
    scope3ai_ctx = await Scope3AI.get_instance().asubmit_impact(scope3_row)
4✔
129
    chat = ChatCompletionOutput(**asdict(response))
4✔
130
    chat.scope3ai = scope3ai_ctx
4✔
131
    return chat
4✔
132

133

134
# Todo: How headers works for stream
135
async def huggingface_async_chat_wrapper_stream(
4✔
136
    wrapped: Callable, instance: AsyncInferenceClient, args: Any, kwargs: Any
137
) -> AsyncIterable[ChatCompletionStreamOutput]:
138
    timer_start = time.perf_counter()
4✔
139
    stream = await wrapped(*args, **kwargs)
4✔
140
    token_count = 0
4✔
141
    model = instance.model or kwargs["model"]
4✔
142
    async for chunk in stream:
4✔
143
        token_count += 1
4✔
144
        request_latency = time.perf_counter() - timer_start
4✔
145
        scope3_row = ImpactRow(
4✔
146
            model_id=model,
147
            output_tokens=token_count,
148
            request_duration_ms=request_latency
149
            * 1000,  # TODO: can we get the header that has the processing time
150
        )
151
        scope3_ctx = await Scope3AI.get_instance().asubmit_impact(scope3_row)
4✔
152
        chunk_data = ChatCompletionStreamOutput(**asdict(chunk))
4✔
153
        if scope3_ctx is not None:
4✔
154
            chunk_data.scope3ai = scope3_ctx
4✔
155
        yield chunk_data
4✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc