• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

IBM / unitxt / 16027414148

02 Jul 2025 02:07PM UTC coverage: 81.221% (+0.2%) from 81.048%
16027414148

Pull #1853

github

web-flow
Merge 2dcd127a5 into ec37c34d7
Pull Request #1853: Add initial multi threading support and tests

1550 of 1919 branches covered (80.77%)

Branch coverage included in aggregate %.

10521 of 12943 relevant lines covered (81.29%)

0.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.73
src/unitxt/utils.py
1
import copy
1✔
2
import functools
1✔
3
import importlib.util
1✔
4
import json
1✔
5
import os
1✔
6
import random
1✔
7
import re
1✔
8
import time
1✔
9
from collections import OrderedDict
1✔
10
from contextvars import ContextVar
1✔
11
from functools import wraps
1✔
12
from typing import Any, Dict, Optional
1✔
13
from urllib.error import HTTPError as UrllibHTTPError
1✔
14

15
from requests.exceptions import ConnectionError, HTTPError
1✔
16
from requests.exceptions import Timeout as TimeoutError
1✔
17

18
from .logging_utils import get_logger
1✔
19
from .settings_utils import get_settings
1✔
20
from .text_utils import is_made_of_sub_strings
1✔
21

22
logger = get_logger()
23
settings = get_settings()
1✔
24

25

26
def retry_connection_with_exponential_backoff(
1✔
27
    max_retries=None,
28
    retry_exceptions=(
29
        ConnectionError,
30
        TimeoutError,
31
        HTTPError,
32
        FileNotFoundError,
33
        UrllibHTTPError,
34
    ),
35
    backoff_factor=1,
36
):
37
    """Decorator that implements retry with exponential backoff for network operations.
38

39
    Also handles errors that were triggered by the specified retry exceptions,
40
    whether they're direct causes or part of the exception context.
41

42
    Args:
43
        max_retries: Maximum number of retry attempts (falls back to settings if None)
44
        retry_exceptions: Tuple of exceptions that should trigger a retry
45
        backoff_factor: Base delay factor in seconds for backoff calculation
46

47
    Returns:
48
        The decorated function with retry logic
49
    """
50

51
    def decorator(func):
1✔
52
        @functools.wraps(func)
1✔
53
        def wrapper(*args, **kwargs):
1✔
54
            # Get max_retries from settings if not provided
55
            retries = (
1✔
56
                max_retries
57
                if max_retries is not None
58
                else settings.max_connection_retries
59
            )
60

61
            for attempt in range(retries):
1✔
62
                try:
1✔
63
                    return func(*args, **kwargs)
1✔
64
                except Exception as e:
65
                    # Check if this exception or any of its causes match the retry exceptions
66
                    should_retry = False
67
                    current_exc = e
68

69
                    # Check the exception chain for both __cause__ (explicit) and __context__ (implicit)
70
                    visited_exceptions = (
71
                        set()
72
                    )  # To prevent infinite loops in rare cyclic exception references
73

74
                    while (
75
                        current_exc is not None
76
                        and id(current_exc) not in visited_exceptions
77
                    ):
78
                        visited_exceptions.add(id(current_exc))
79

80
                        if isinstance(current_exc, retry_exceptions):
81
                            should_retry = True
82
                            break
83

84
                        # First check __cause__ (from "raise X from Y")
85
                        if current_exc.__cause__ is not None:
86
                            current_exc = current_exc.__cause__
87
                        # Then check __context__ (from "try: ... except: raise X")
88
                        elif current_exc.__context__ is not None:
89
                            current_exc = current_exc.__context__
90
                        else:
91
                            # No more causes in the chain
92
                            break
93

94
                    if not should_retry:
95
                        # Not a retry exception or caused by a retry exception, so re-raise
96
                        raise
97

98
                    if attempt >= retries - 1:  # Last attempt
99
                        raise  # Re-raise the last exception
100

101
                    # Calculate exponential backoff with jitter
102
                    wait_time = backoff_factor * (2**attempt) + random.uniform(0, 1)
103
                    logger.warning(
104
                        f"{func.__name__} failed (attempt {attempt+1}/{retries}). "
105
                        f"Retrying in {wait_time:.2f}s. Error: {e!s}"
106
                    )
107
                    time.sleep(wait_time)
108

109
            raise ValueError("there was a problem") from None
110

111
        return wrapper
1✔
112

113
    return decorator
1✔
114

115

116
class Singleton(type):
1✔
117
    _instances = {}
1✔
118

119
    def __call__(cls, *args, **kwargs):
1✔
120
        if cls not in cls._instances:
1✔
121
            cls._instances[cls] = super().__call__(*args, **kwargs)
1✔
122
        return cls._instances[cls]
1✔
123

124

125
class LRUCache:
1✔
126
    def __init__(self, max_size: Optional[int] = 10):
1✔
127
        self._max_size = max_size
1✔
128
        self._context_cache = ContextVar("context_lru_cache", default=None)
1✔
129

130
    def _get_cache(self):
1✔
131
        cache = self._context_cache.get()
1✔
132
        if cache is None:
1✔
133
            cache = OrderedDict()
1✔
134
            self._context_cache.set(cache)
1✔
135
        return cache
1✔
136

137
    def __setitem__(self, key, value):
1✔
138
        cache = self._get_cache()
1✔
139
        if key in cache:
1✔
140
            cache.pop(key)
×
141
        cache[key] = value
1✔
142
        if self._max_size is not None:
1✔
143
            while len(cache) > self._max_size:
1✔
144
                cache.popitem(last=False)
1✔
145

146
    def __getitem__(self, key):
1✔
147
        cache = self._get_cache()
1✔
148
        if key in cache:
1✔
149
            value = cache.pop(key)
1✔
150
            cache[key] = value
1✔
151
            return value
1✔
152
        raise KeyError(f"{key} not found in cache")
×
153

154
    def get(self, key, default=None):
1✔
155
        cache = self._get_cache()
1✔
156
        if key in cache:
1✔
157
            value = cache.pop(key)
1✔
158
            cache[key] = value
1✔
159
            return value
1✔
160
        return default
1✔
161

162
    def clear(self):
1✔
163
        """Clear all items from the cache."""
164
        cache = self._get_cache()
1✔
165
        cache.clear()
1✔
166

167
    def __contains__(self, key):
1✔
168
        return key in self._get_cache()
1✔
169

170
    def __len__(self):
1✔
171
        return len(self._get_cache())
1✔
172

173
    def __repr__(self):
1✔
174
        return f"LRUCache(max_size={self._max_size}, items={list(self._get_cache().items())})"
×
175

176

177
def lru_cache_decorator(max_size=128):
1✔
178
    def decorator(func):
1✔
179
        cache = LRUCache(max_size=max_size)
1✔
180

181
        @wraps(func)
1✔
182
        def wrapper(*args, **kwargs):
1✔
183
            key = args
1✔
184
            if kwargs:
1✔
185
                key += tuple(sorted(kwargs.items()))
1✔
186
            if key in cache:
1✔
187
                return cache[key]
1✔
188
            result = func(*args, **kwargs)
1✔
189
            cache[key] = result
1✔
190
            return result
1✔
191

192
        wrapper.cache_clear = cache.clear
1✔
193
        return wrapper
1✔
194

195
    return decorator
1✔
196

197

198
@lru_cache_decorator(max_size=None)
1✔
199
def artifacts_json_cache(artifact_path):
1✔
200
    return load_json(artifact_path)
1✔
201

202

203
def flatten_dict(
1✔
204
    d: Dict[str, Any], parent_key: str = "", sep: str = "_"
205
) -> Dict[str, Any]:
206
    items = []
1✔
207
    for k, v in d.items():
1✔
208
        new_key = parent_key + sep + k if parent_key else k
1✔
209
        if isinstance(v, dict):
1✔
210
            items.extend(flatten_dict(v, new_key, sep=sep).items())
1✔
211
        else:
212
            items.append((new_key, v))
1✔
213

214
    return dict(items)
1✔
215

216

217
def load_json(path):
1✔
218
    with open(path) as f:
1✔
219
        try:
1✔
220
            return json.load(f)
1✔
221
        except json.decoder.JSONDecodeError as e:
×
222
            with open(path) as f:
×
223
                file_content = "\n".join(f.readlines())
×
224
            raise RuntimeError(
×
225
                f"Failed to decode json file at '{path}' with file content:\n{file_content}"
226
            ) from e
227

228

229
def save_to_file(path, data):
1✔
230
    with open(path, "w") as f:
1✔
231
        f.write(data)
1✔
232
        f.write("\n")
1✔
233

234

235
def json_dump(data):
1✔
236
    return json.dumps(data, indent=4, ensure_ascii=False)
1✔
237

238

239
def is_package_installed(package_name):
1✔
240
    """Check if a package is installed.
241

242
    Parameters:
243
    - package_name (str): The name of the package to check.
244

245
    Returns:
246
    - bool: True if the package is installed, False otherwise.
247
    """
248
    unitxt_pkg = importlib.util.find_spec(package_name)
1✔
249
    return unitxt_pkg is not None
1✔
250

251

252
def is_module_available(module_name):
1✔
253
    """Check if a module is available in the current Python environment.
254

255
    Parameters:
256
    - module_name (str): The name of the module to check.
257

258
    Returns:
259
    - bool: True if the module is available, False otherwise.
260
    """
261
    try:
1✔
262
        __import__(module_name)
1✔
263
        return True
1✔
264
    except ImportError:
1✔
265
        return False
1✔
266

267

268
def remove_numerics_and_quoted_texts(input_str):
1✔
269
    # Remove floats first to avoid leaving stray periods
270
    input_str = re.sub(r"\d+\.\d+", "", input_str)
1✔
271

272
    # Remove integers
273
    input_str = re.sub(r"\d+", "", input_str)
1✔
274

275
    # Remove strings in single quotes
276
    input_str = re.sub(r"'.*?'", "", input_str)
1✔
277

278
    # Remove strings in double quotes
279
    input_str = re.sub(r'".*?"', "", input_str)
1✔
280

281
    # Remove strings in triple quotes
282
    return re.sub(r'""".*?"""', "", input_str, flags=re.DOTALL)
1✔
283

284

285
def safe_eval(expression: str, context: dict, allowed_tokens: list) -> any:
1✔
286
    """Evaluates a given expression in a restricted environment, allowing only specified tokens and context variables.
287

288
    Args:
289
        expression (str): The expression to evaluate.
290
        context (dict): A dictionary mapping variable names to their values, which
291
                        can be used in the expression.
292
        allowed_tokens (list): A list of strings representing allowed tokens (such as
293
                               operators, function names, etc.) that can be used in the expression.
294

295
    Returns:
296
        any: The result of evaluating the expression.
297

298
    Raises:
299
        ValueError: If the expression contains tokens not in the allowed list or context keys.
300

301
    Note:
302
        This function should be used carefully, as it employs `eval`, which can
303
        execute arbitrary code. The function attempts to mitigate security risks
304
        by restricting the available tokens and not exposing built-in functions.
305
    """
306
    allowed_sub_strings = list(context.keys()) + allowed_tokens
1✔
307
    if is_made_of_sub_strings(
1✔
308
        remove_numerics_and_quoted_texts(expression), allowed_sub_strings
309
    ):
310
        return eval(expression, {"__builtins__": {}}, context)
1✔
311
    raise ValueError(
312
        f"The expression '{expression}' can not be evaluated because it contains tokens outside the allowed list of {allowed_sub_strings}."
313
    )
314

315

316
def import_module_from_file(file_path):
1✔
317
    # Get the module name (file name without extension)
318
    module_name = os.path.splitext(os.path.basename(file_path))[0]
×
319
    # Create a module specification
320
    spec = importlib.util.spec_from_file_location(module_name, file_path)
×
321
    # Create a new module based on the specification
322
    module = importlib.util.module_from_spec(spec)
×
323
    # Load the module
324
    spec.loader.exec_module(module)
×
325
    return module
×
326

327

328
def deep_copy(obj):
1✔
329
    """Creates a deep copy of the given object.
330

331
    Args:
332
        obj: The object to be deep copied.
333

334
    Returns:
335
        A deep copy of the original object.
336
    """
337
    return copy.deepcopy(obj)
1✔
338

339

340
def shallow_copy(obj):
1✔
341
    """Creates a shallow copy of the given object.
342

343
    Args:
344
        obj: The object to be shallow copied.
345

346
    Returns:
347
        A shallow copy of the original object.
348
    """
349
    return copy.copy(obj)
1✔
350

351

352
def recursive_copy(obj, internal_copy=None):
1✔
353
    """Recursively copies an object with a selective copy method.
354

355
    For `list`, `dict`, and `tuple` types, it recursively copies their contents.
356
    For other types, it uses the provided `internal_copy` function if available.
357
    Objects without a `copy` method are returned as is.
358

359
    Args:
360
        obj: The object to be copied.
361
        internal_copy (callable, optional): The copy function to use for non-container objects.
362
            If `None`, objects without a `copy` method are returned as is.
363

364
    Returns:
365
        The recursively copied object.
366
    """
367
    # Handle dictionaries
368
    if isinstance(obj, dict):
1✔
369
        return type(obj)(
1✔
370
            {key: recursive_copy(value, internal_copy) for key, value in obj.items()}
371
        )
372

373
    # Handle named tuples
374
    if isinstance(obj, tuple) and hasattr(obj, "_fields"):
1✔
375
        return type(obj)(*(recursive_copy(item, internal_copy) for item in obj))
1✔
376

377
    # Handle tuples and lists
378
    if isinstance(obj, (tuple, list)):
1✔
379
        return type(obj)(recursive_copy(item, internal_copy) for item in obj)
1✔
380

381
    if internal_copy is None:
1✔
382
        return obj
1✔
383

384
    return internal_copy(obj)
1✔
385

386

387
def recursive_deep_copy(obj):
1✔
388
    """Performs a recursive deep copy of the given object.
389

390
    This function uses `deep_copy` as the internal copy method for non-container objects.
391

392
    Args:
393
        obj: The object to be deep copied.
394

395
    Returns:
396
        A recursively deep-copied version of the original object.
397
    """
398
    return recursive_copy(obj, deep_copy)
1✔
399

400

401
def recursive_shallow_copy(obj):
1✔
402
    """Performs a recursive shallow copy of the given object.
403

404
    This function uses `shallow_copy` as the internal copy method for non-container objects.
405

406
    Args:
407
        obj: The object to be shallow copied.
408

409
    Returns:
410
        A recursively shallow-copied version of the original object.
411
    """
412
    return recursive_copy(obj, shallow_copy)
1✔
413

414

415
class LongString(str):
1✔
416
    def __new__(cls, value, *, repr_str=None):
1✔
417
        obj = super().__new__(cls, value)
×
418
        obj._repr_str = repr_str
×
419
        return obj
×
420

421
    def __repr__(self):
1✔
422
        if self._repr_str is not None:
×
423
            return self._repr_str
×
424
        return super().__repr__()
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc