• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

scope3data / scope3ai-py / 12920205472

23 Jan 2025 01:12AM UTC coverage: 95.993% (+15.4%) from 80.557%
12920205472

Pull #74

github

9af6ef
tito
fix: try fixing test again
Pull Request #74: feat(metadata): include many metadata accessible at global or tracer level

49 of 50 new or added lines in 4 files covered. (98.0%)

53 existing lines in 10 files now uncovered.

2204 of 2296 relevant lines covered (95.99%)

3.84 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.84
/scope3ai/tracers/huggingface/chat.py
1
import time
4✔
2
from collections.abc import AsyncIterable, Iterable
4✔
3
from dataclasses import asdict, dataclass
4✔
4
from typing import Any, Callable, Optional, Union
4✔
5

6
import tiktoken
4✔
7
from huggingface_hub import (  # type: ignore[import-untyped]
4✔
8
    AsyncInferenceClient,
9
    InferenceClient,
10
)
11
from huggingface_hub import ChatCompletionOutput as _ChatCompletionOutput
4✔
12
from huggingface_hub import ChatCompletionStreamOutput as _ChatCompletionStreamOutput
4✔
13
from requests import Response
4✔
14

15
from scope3ai.api.types import ImpactRow, Scope3AIContext
4✔
16
from scope3ai.constants import PROVIDERS
4✔
17
from scope3ai.lib import Scope3AI
4✔
18
from scope3ai.response_interceptor.requests_interceptor import requests_response_capture
4✔
19

20
PROVIDER = PROVIDERS.HUGGINGFACE_HUB.value
4✔
21
HUGGING_FACE_CHAT_TASK = "chat"
4✔
22

23

24
@dataclass
4✔
25
class ChatCompletionOutput(_ChatCompletionOutput):
4✔
26
    scope3ai: Optional[Scope3AIContext] = None
4✔
27

28

29
@dataclass
4✔
30
class ChatCompletionStreamOutput(_ChatCompletionStreamOutput):
4✔
31
    scope3ai: Optional[Scope3AIContext] = None
4✔
32

33

34
def huggingface_chat_wrapper(
4✔
35
    wrapped: Callable, instance: InferenceClient, args: Any, kwargs: Any
36
) -> Union[ChatCompletionOutput, Iterable[ChatCompletionStreamOutput]]:
37
    if kwargs.get("stream", False):
4✔
38
        return huggingface_chat_wrapper_stream(wrapped, instance, args, kwargs)
4✔
39
    else:
40
        return huggingface_chat_wrapper_non_stream(wrapped, instance, args, kwargs)
4✔
41

42

43
def huggingface_chat_wrapper_non_stream(
4✔
44
    wrapped: Callable, instance: InferenceClient, args: Any, kwargs: Any
45
) -> ChatCompletionOutput:
46
    timer_start = time.perf_counter()
4✔
47
    http_response: Response | None = None
4✔
48
    with requests_response_capture() as responses:
4✔
49
        response = wrapped(*args, **kwargs)
4✔
50
        http_responses = responses.get()
4✔
51
        if http_responses:
4✔
52
            http_response = http_responses[0]
4✔
53
    model = (
4✔
54
        instance.model
55
        or kwargs.get("model")
56
        or instance.get_recommended_model(HUGGING_FACE_CHAT_TASK)
57
    )
58
    if http_response:
4✔
59
        compute_time = http_response.headers.get("x-compute-time")
4✔
60
    else:
UNCOV
61
        compute_time = time.perf_counter() - timer_start
×
62
    scope3_row = ImpactRow(
4✔
63
        model_id=model,
64
        input_tokens=response.usage.prompt_tokens,
65
        output_tokens=response.usage.completion_tokens,
66
        request_duration_ms=float(compute_time) * 1000,
67
        managed_service_id=PROVIDER,
68
    )
69
    scope3ai_ctx = Scope3AI.get_instance().submit_impact(scope3_row)
4✔
70
    chat = ChatCompletionOutput(**asdict(response))
4✔
71
    chat.scope3ai = scope3ai_ctx
4✔
72
    return chat
4✔
73

74

75
def huggingface_chat_wrapper_stream(
4✔
76
    wrapped: Callable, instance: InferenceClient, args: Any, kwargs: Any
77
) -> Iterable[ChatCompletionStreamOutput]:
78
    timer_start = time.perf_counter()
4✔
79
    stream = wrapped(*args, **kwargs)
4✔
80
    token_count = 0
4✔
81
    model = (
4✔
82
        instance.model
83
        or kwargs.get("model")
84
        or instance.get_recommended_model(HUGGING_FACE_CHAT_TASK)
85
    )
86
    for chunk in stream:
4✔
87
        token_count += 1
4✔
88
        request_latency = time.perf_counter() - timer_start
4✔
89
        scope3_row = ImpactRow(
4✔
90
            model_id=model,
91
            output_tokens=token_count,
92
            request_duration_ms=request_latency * 1000,
93
            managed_service_id=PROVIDER,
94
        )
95
        chunk_data = ChatCompletionStreamOutput(**asdict(chunk))
4✔
96
        scope3_ctx = Scope3AI.get_instance().submit_impact(scope3_row)
4✔
97
        if scope3_ctx is not None:
4✔
98
            chunk_data.scope3ai = scope3_ctx
4✔
99
        yield chunk_data
4✔
100

101

102
async def huggingface_async_chat_wrapper(
4✔
103
    wrapped: Callable, instance: AsyncInferenceClient, args: Any, kwargs: Any
104
) -> Union[ChatCompletionOutput, AsyncIterable[ChatCompletionStreamOutput]]:
105
    if kwargs.get("stream", False):
4✔
106
        return huggingface_async_chat_wrapper_stream(wrapped, instance, args, kwargs)
4✔
107
    else:
108
        return await huggingface_async_chat_wrapper_non_stream(
4✔
109
            wrapped, instance, args, kwargs
110
        )
111

112

113
async def huggingface_async_chat_wrapper_non_stream(
4✔
114
    wrapped: Callable, instance: AsyncInferenceClient, args: Any, kwargs: Any
115
) -> ChatCompletionOutput:
116
    timer_start = time.perf_counter()
4✔
117

118
    response = await wrapped(*args, **kwargs)
4✔
119
    request_latency = time.perf_counter() - timer_start
4✔
120
    model = (
4✔
121
        instance.model or kwargs.get("model") or instance.get_recommended_model("chat")
122
    )
123
    encoder = tiktoken.get_encoding("cl100k_base")
4✔
124
    output_tokens = len(encoder.encode(response.choices[0].message.content))
4✔
125
    scope3_row = ImpactRow(
4✔
126
        model_id=model,
127
        input_tokens=response.usage.prompt_tokens,
128
        output_tokens=output_tokens,
129
        request_duration_ms=request_latency * 1000,
130
        managed_service_id=PROVIDER,
131
    )
132

133
    scope3ai_ctx = await Scope3AI.get_instance().asubmit_impact(scope3_row)
4✔
134
    chat = ChatCompletionOutput(**asdict(response))
4✔
135
    chat.scope3ai = scope3ai_ctx
4✔
136
    return chat
4✔
137

138

139
# Todo: How headers works for stream
140
async def huggingface_async_chat_wrapper_stream(
4✔
141
    wrapped: Callable, instance: AsyncInferenceClient, args: Any, kwargs: Any
142
) -> AsyncIterable[ChatCompletionStreamOutput]:
143
    timer_start = time.perf_counter()
4✔
144
    stream = await wrapped(*args, **kwargs)
4✔
145
    token_count = 0
4✔
146
    model = instance.model or kwargs["model"]
4✔
147
    async for chunk in stream:
4✔
148
        token_count += 1
4✔
149
        request_latency = time.perf_counter() - timer_start
4✔
150
        scope3_row = ImpactRow(
4✔
151
            model_id=model,
152
            output_tokens=token_count,
153
            request_duration_ms=request_latency
154
            * 1000,  # TODO: can we get the header that has the processing time
155
            managed_service_id=PROVIDER,
156
        )
157
        scope3_ctx = await Scope3AI.get_instance().asubmit_impact(scope3_row)
4✔
158
        chunk_data = ChatCompletionStreamOutput(**asdict(chunk))
4✔
159
        if scope3_ctx is not None:
4✔
160
            chunk_data.scope3ai = scope3_ctx
4✔
161
        yield chunk_data
4✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc