• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

IBM / unitxt / 16704320175

03 Aug 2025 11:05AM UTC coverage: 80.829% (-0.4%) from 81.213%
16704320175

Pull #1845

github

web-flow
Merge 59428aa88 into 5372aa6df
Pull Request #1845: Allow using python functions instead of operators (e.g in pre-processing pipeline)

1576 of 1970 branches covered (80.0%)

Branch coverage included in aggregate %.

10685 of 13199 relevant lines covered (80.95%)

0.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.89
src/unitxt/text_utils.py
1
import re
1✔
2
import shutil
1✔
3
import types
1✔
4
from typing import List, Tuple
1✔
5

6
import pandas as pd
1✔
7

8
from .logging_utils import get_logger
1✔
9

10
logger = get_logger()
11

12

13
def split_words(s):
1✔
14
    """Splits a string into words based on PascalCase, camelCase, snake_case, kebab-case, and numbers attached to strings.
15

16
    Args:
17
        s (str): The string to be split.
18

19
    Returns:
20
        list: The list of words obtained after splitting the string.
21
    """
22
    # Split PascalCase or camelCase
23
    s = re.sub(r"([A-Z][a-z]+)", r" \1", re.sub(r"([A-Z]+)", r" \1", s)).strip()
1✔
24
    # Split snake_case or kebab-case
25
    s = re.sub(r"[_-]", " ", s)
1✔
26
    # Split numbers attached to strings
27
    s = re.sub(r"([a-zA-Z])(\d)", r"\1 \2", s)
1✔
28
    s = re.sub(r"(\d)([a-zA-Z])", r"\1 \2", s)
1✔
29
    # Split the string into words based on spaces
30
    return s.split()
1✔
31

32

33
def is_camel_case(s):
1✔
34
    """Checks if a string is in camelCase.
35

36
    Args:
37
        s (str): The string to be checked.
38

39
    Returns:
40
        bool: True if the string is in camelCase, False otherwise.
41
    """
42
    return re.match(r"^[A-Z]+([a-z0-9]*[A-Z]*[a-z0-9]*)*$", s) is not None
1✔
43

44

45
def is_snake_case(s):
1✔
46
    """Checks if a string is in snake_case.
47

48
    Args:
49
        s (str): The string to be checked.
50

51
    Returns:
52
        bool: True if the string is in snake_case, False otherwise.
53
    """
54
    return re.match(r"^[a-z0-9]+(_[a-z0-9]+)*$", s) is not None
1✔
55

56

57
def camel_to_snake_case(s):
1✔
58
    """Converts a string from camelCase to snake_case.
59

60
    Args:
61
        s (str): The string to be converted.
62

63
    Returns:
64
        str: The string converted to snake_case.
65
    """
66
    # Add an underscore before every uppercase letter that is followed by a lowercase letter or digit and not preceded by an underscore, a hyphen or an uppercase letter
67
    s = re.sub(r"(?<=[^A-Z_-])([A-Z])", r"_\1", s)
1✔
68

69
    # Ensure there's an underscore before any uppercase letter that's followed by a lowercase letter or digit and comes after a sequence of uppercase letters
70
    s = re.sub(r"([A-Z]+)([A-Z][a-z0-9])", r"\1_\2", s)
1✔
71

72
    return s.lower()
1✔
73

74

75
def to_pretty_string(
1✔
76
    value,
77
    indent=0,
78
    indent_delta=4,
79
    max_chars=None,
80
    keys=None,
81
    item_label=None,
82
    float_format=None,
83
):
84
    """Constructs a formatted string representation of various data structures (dicts, lists, tuples, and DataFrames).
85

86
    Args:
87
        value: The Python data structure to be formatted.
88
        indent (int, optional): The current level of indentation. Defaults to 0.
89
        indent_delta (int, optional): Amount of spaces to add per indentation level. Defaults to 4.
90
        max_chars (int, optional): Max characters per line before wrapping. Defaults to terminal width - 10.
91
        keys (List[str], optional): For dicts, optionally specify keys and order.
92
        item_label (str, optional): Internal parameter for labeling items.
93
        float_format (str, optional): Format string for float values (e.g., ".2f"). Defaults to None.
94
    """
95
    max_chars = max_chars or shutil.get_terminal_size()[0] - 10
1✔
96
    indent_str = " " * indent
1✔
97
    res = ""
1✔
98

99
    if isinstance(value, dict):
1✔
100
        keys_to_print = keys if keys is not None else list(value.keys())
1✔
101

102
        for k in keys_to_print:
1✔
103
            if k not in value:
1✔
104
                raise ValueError(
105
                    f"Dictionary does not contain field '{k}' specified in 'keys' argument. "
106
                    f"The available keys are {list(value.keys())}"
107
                )
108

109
        for k in keys_to_print:
1✔
110
            v = value[k]
1✔
111
            item_header = f"{k} ({type(v).__name__})"
1✔
112
            res += f"{indent_str}{item_header}:\n"
1✔
113
            res += to_pretty_string(
1✔
114
                v,
115
                indent=indent + indent_delta,
116
                indent_delta=indent_delta,
117
                max_chars=max_chars,
118
                float_format=float_format,
119
            )
120

121
    elif isinstance(value, (list, tuple)):
1✔
122
        for i, v in enumerate(value):
1✔
123
            label = f"[{i}]" if isinstance(value, list) else f"({i})"
1✔
124
            item_header = f"{label} ({type(v).__name__})"
1✔
125
            res += f"{indent_str}{item_header}:\n"
1✔
126
            res += to_pretty_string(
1✔
127
                v,
128
                indent=indent + indent_delta,
129
                indent_delta=indent_delta,
130
                max_chars=max_chars,
131
                float_format=float_format,
132
            )
133

134
    elif isinstance(value, pd.DataFrame):
1✔
135
        line_width = max_chars - indent
1✔
136
        options = [
1✔
137
            "display.max_rows",
138
            None,
139
            "display.max_columns",
140
            None,
141
            "display.max_colwidth",
142
            None,
143
            "display.width",
144
            line_width,
145
            # 'display.colheader_justify', 'left'
146
        ]
147
        if float_format is not None:
1✔
148
            options.extend(
×
149
                ["display.float_format", ("{:," + float_format + "}").format]
150
            )
151
        with pd.option_context(*options):
1✔
152
            df_str = repr(value)
1✔
153

154
        lines = df_str.split("\n")
1✔
155
        for line in lines:
1✔
156
            if len(line) + len(indent_str) > line_width:
1✔
157
                start = 0
1✔
158
                while start < len(line):
1✔
159
                    wrap_chunk = line[start : start + line_width].rstrip()
1✔
160
                    res += f"{indent_str}{wrap_chunk}\n"
1✔
161
                    start += line_width
1✔
162
            else:
163
                res += f"{indent_str}{line.rstrip()}\n"
1✔
164

165
    else:
166
        # Handle scalar values, including floats
167
        if isinstance(value, float) and float_format:
1✔
168
            formatted_value = f"{value:{float_format}}"
×
169
        else:
170
            formatted_value = str(value)
1✔
171

172
        # Wrap lines according to max_chars
173
        line_width = max_chars - indent
1✔
174
        lines = formatted_value.split("\n")
1✔
175
        for line in lines:
1✔
176
            if len(line) + len(indent_str) > line_width:
1✔
177
                start = 0
1✔
178
                while start < len(line):
1✔
179
                    wrap_chunk = line[start : start + line_width].rstrip()
1✔
180
                    res += f"{indent_str}{wrap_chunk}\n"
1✔
181
                    start += line_width
1✔
182
            else:
183
                res += f"{indent_str}{line.rstrip()}\n"
1✔
184

185
    return res
1✔
186

187

188
def construct_dict_as_yaml_lines(d, indent_delta=2) -> List[str]:
1✔
189
    """Constructs the lines of a dictionary formatted as yaml.
190

191
    Args:
192
        d: The element to be formatted.
193
        indent_delta (int, optional): The amount of spaces to add for each level of indentation. Defaults to 2.
194
    """
195
    indent_delta_str = " " * indent_delta
1✔
196
    ticked_indent_delta_str = indent_delta_str[:-2] + "- "
1✔
197
    assert (
1✔
198
        indent_delta >= 2
199
    ), f"Needs at least 2 position indentations, for the case of list elements, that are to be preceded each by ' -'. Got indent_delta={indent_delta}."
200
    res = []  # computed hereunder as a list of lines, that are indented only at the end
1✔
201

202
    if isinstance(d, dict):
1✔
203
        if len(d) == 0:
1✔
204
            return ["{}"]
×
205
        for key, val in d.items():
1✔
206
            printable_key = f'"{key}"' if (" " in key) or (key == "") else key
1✔
207
            res.append(printable_key + ": ")
1✔
208
            yaml_for_val = construct_dict_as_yaml_lines(val, indent_delta=indent_delta)
1✔
209
            assert len(yaml_for_val) > 0
1✔
210
            if len(yaml_for_val) == 1:
1✔
211
                res[-1] += yaml_for_val[0]
1✔
212
            else:
213
                for line in yaml_for_val:
1✔
214
                    res.append(indent_delta_str + line)
1✔
215
        return res
1✔
216

217
    if isinstance(d, list):
1✔
218
        if len(d) == 0:
1✔
219
            return ["[]"]
×
220
        for val in d:
1✔
221
            yaml_for_val = construct_dict_as_yaml_lines(val, indent_delta=indent_delta)
1✔
222
            assert len(yaml_for_val) > 0
1✔
223
            res.append(ticked_indent_delta_str + yaml_for_val[0])
1✔
224
            for line in yaml_for_val[1:]:
1✔
225
                res.append(indent_delta_str + line)
1✔
226
        return res
1✔
227

228
    # d1 = re.sub(r"(\n+)", r'"\1"', str(d))
229
    d1 = str(d).replace("\n", "\\n").replace('"', '\\"')
1✔
230
    if "\\n" in d1 or d1 == "":
1✔
231
        d1 = f'"{d1}"'
×
232
    return [d1]
1✔
233

234

235
def construct_dict_as_python_lines(d, indent_delta=4) -> List[str]:
1✔
236
    """Constructs the lines of a dictionary formatted as a piece of python code.
237

238
    Args:
239
        d: The element to be formatted.
240
        indent_delta (int, optional): The amount of spaces to add for each level of indentation. Defaults to 2.
241
    """
242
    indent_delta_str = " " * indent_delta
1✔
243
    res = []  # computed hereunder as a list of lines, that are indented only at the end
1✔
244

245
    if isinstance(d, dict):
1✔
246
        istype = False
1✔
247
        if len(d) == 0:
1✔
248
            return ["{}"]
×
249
        if "__type__" in d:
1✔
250
            istype = True
1✔
251
            res = ["__type__" + d["__type__"] + "("]
1✔
252
            if len(d) == 1:
1✔
253
                res[0] += ")"
×
254
                return res
×
255
        else:
256
            res = ["{"]
1✔
257
        for key, val in d.items():
1✔
258
            if key == "__type__":
1✔
259
                continue
1✔
260
            printable_key = f'"{key}"' if not istype else key
1✔
261
            res.append(printable_key + ("=" if istype else ": "))
1✔
262
            py_for_val = construct_dict_as_python_lines(val, indent_delta=indent_delta)
1✔
263
            assert len(py_for_val) > 0
1✔
264
            if len(py_for_val) == 1:
1✔
265
                res[-1] += py_for_val[0] + ","
1✔
266
            else:
267
                res[-1] += py_for_val[0]
1✔
268
                if py_for_val[0].startswith("{") or py_for_val[0].startswith("["):
1✔
269
                    for line in py_for_val[1:-1]:
1✔
270
                        res.append(indent_delta_str + line)
1✔
271
                else:
272
                    # val is type, its inner lines are already indented
273
                    res.extend(py_for_val[1:-1])
1✔
274
                res.append(py_for_val[-1] + ",")
1✔
275
        res.append(")" if istype else "}")
1✔
276
        if istype:
1✔
277
            for i in range(1, len(res) - 1):
1✔
278
                res[i] = indent_delta_str + res[i]
1✔
279
        return res
1✔
280

281
    if isinstance(d, list):
1✔
282
        if len(d) == 0:
1✔
283
            return ["[]"]
×
284
        res = ["["]
1✔
285
        for val in d:
1✔
286
            py_for_val = construct_dict_as_python_lines(val, indent_delta=indent_delta)
1✔
287
            assert len(py_for_val) > 0
1✔
288
            for line in py_for_val[:-1]:
1✔
289
                res.append(line)
1✔
290
            res.append(py_for_val[-1] + ",")
1✔
291
        res.append("]")
1✔
292
        return res
1✔
293

294
    # d1 = re.sub(r"(\n+)", r'"\1"', str(d))
295
    if isinstance(d, str):
1✔
296
        return [f'"{d}"']
1✔
297
    if d is None or isinstance(d, (int, float, bool)):
×
298
        return [f"{d}"]
×
299

300
    if isinstance(d, types.FunctionType):
×
301
        from .utils import get_function_source
×
302

303
        try:
×
304
            source = get_function_source(d)
×
305
            source_lines = source.splitlines()
×
306

307
            # Find the base indentation of the function definition
308
            base_indent = len(source_lines[0]) - len(source_lines[0].lstrip())
×
309

310
            # Remove only the base indentation from each line
311
            result_lines = []
×
312
            for line in source_lines:
×
313
                # Preserve empty lines
314
                if line.strip() == "":
×
315
                    result_lines.append("")
×
316
                else:
317
                    # Remove base indent while preserving internal indentation
318
                    if line.startswith(" " * base_indent):
×
319
                        result_lines.append(line[base_indent:])
×
320
                    else:
321
                        result_lines.append(line.lstrip())
×
322

323
            return result_lines
×
324

325
        except (OSError, TypeError):
×
326
            # If source is not available
327
            return [f"<function {d.__name__} (source unavailable)>"]
×
328

329
    raise RuntimeError(f"unrecognized value to print as python: {d}")
×
330

331

332
def print_dict(
1✔
333
    d, indent=0, indent_delta=4, max_chars=None, keys_to_print=None, log_level="info"
334
):
335
    dict_str = to_pretty_string(d, indent, indent_delta, max_chars, keys_to_print)
1✔
336
    dict_str = "\n" + dict_str
1✔
337
    getattr(logger, log_level)(dict_str)
338

339

340
def print_dict_as_yaml(d: dict, indent_delta=2) -> str:
1✔
341
    yaml_lines = construct_dict_as_yaml_lines(d, indent_delta=indent_delta)
1✔
342
    # yaml_lines = [re.sub(r"(\n+)", r'"\1"', line) for line in yaml_lines]
343
    # yaml_lines = [line.replace("\n", "\\n") for line in yaml_lines]
344
    return "\n".join(yaml_lines)
1✔
345

346

347
def print_dict_as_python(d: dict, indent_delta=4) -> str:
1✔
348
    py_lines = construct_dict_as_python_lines(d, indent_delta=indent_delta)
1✔
349
    assert len(py_lines) > 0
1✔
350
    return "\n".join(py_lines)
1✔
351

352

353
def nested_tuple_to_string(nested_tuple: tuple) -> str:
1✔
354
    """Converts a nested tuple to a string, with elements separated by underscores.
355

356
    Args:
357
        nested_tuple (tuple): The nested tuple to be converted.
358

359
    Returns:
360
        str: The string representation of the nested tuple.
361
    """
362
    result = []
×
363
    for item in nested_tuple:
×
364
        if isinstance(item, tuple):
×
365
            result.append(nested_tuple_to_string(item))
×
366
        else:
367
            result.append(str(item))
×
368
    return "_".join(result)
×
369

370

371
def is_made_of_sub_strings(string, sub_strings):
1✔
372
    pattern = "^(" + "|".join(map(re.escape, sub_strings)) + ")+$"
1✔
373
    return bool(re.match(pattern, string))
1✔
374

375

376
# Giveמ all the lines of a card preparer file, e.g. all the lines of prepare/cards/cohere_for_ai.py,
377
# and an object name, e.g. TaskCard(,
378
# return the ordinal number of the line that starts that object, in our example: the
379
# line number of the following line (notice that the line where TaskCard is imported
380
# is not supposed to return):
381
#         card = TaskCard(
382
# and the line number of the line that ends the object, in our case the line that include
383
# the matching close:
384
#         )
385
# This util depends on ruff to ensure this setting of the card file: that a close of one
386
# tag and the open of the next tag, do not sit in same line, when both tags being
387
# major level within TaskCard.
388
# It also prepares for the case that  __description__ tag does not contain balanced
389
# parentheses, since it is often cut in the middle, (with  "... see more at")
390
# flake8: noqa: B007
391
# flake8: noqa: C901
392
def lines_defining_obj_in_card(
1✔
393
    all_lines: List[str], obj_name: str, start_search_at_line: int = 0
394
) -> Tuple[int, int]:
395
    for starting_line in range(start_search_at_line, len(all_lines)):
1✔
396
        line = all_lines[starting_line]
1✔
397
        if obj_name in line:
1✔
398
            break
1✔
399
    if obj_name not in line:
1✔
400
        # obj_name found no where in the input lines
401
        return (-1, -1)
×
402
    num_of_opens = 0
1✔
403
    num_of_closes = 0
1✔
404
    ending_line = starting_line - 1
1✔
405
    while ending_line < len(all_lines):
1✔
406
        ending_line += 1
1✔
407

408
        if "__description__" in all_lines[ending_line]:
1✔
409
            # can not trust parentheses inside description, because this is mainly truncated
410
            # free text.
411
            # We do trust the indentation enforced by ruff, and the way we build __description__:
412
            # a line consisting of only __description__=(
413
            # followed by one or more lines of text, can not trust opens and closes
414
            # in them, followed by a line consisting of only:  ),
415
            # where the ) is indented with the beginning of __description__
416
            # We also prepare for the case that, when not entered by us, __description__=
417
            # is not followed by a ( and the whole description does not end with a single ) in its line.
418
            # We build on ruff making the line following the description start with same indentation
419
            # or 4 less (i.e., the following line is the closing of the card).
420
            tag_indentation = all_lines[ending_line].index("__description__")
1✔
421
            starts_with_parent = "__description__=(" in all_lines[ending_line]
1✔
422
            if starts_with_parent:
1✔
423
                last_line_to_start_with = (" " * tag_indentation) + r"\)"
1✔
424
            else:
425
                # actually, the line that follows the description
426
                last_line_to_start_with1 = (" " * tag_indentation) + "[^ ]"
1✔
427
                last_line_to_start_with2 = (" " * (tag_indentation - 4)) + "[^ ]"
1✔
428
                last_line_to_start_with = (
1✔
429
                    "("
430
                    + last_line_to_start_with1
431
                    + "|"
432
                    + last_line_to_start_with2
433
                    + ")"
434
                )
435
            ending_line += 1
1✔
436
            while not re.search("^" + last_line_to_start_with, all_lines[ending_line]):
1✔
437
                ending_line += 1
1✔
438
            if "__description__" in obj_name:
1✔
439
                return (
1✔
440
                    starting_line,
441
                    ending_line if starts_with_parent else ending_line - 1,
442
                )
443

444
            if starts_with_parent:
1✔
445
                ending_line += 1
1✔
446

447
            # we conrinue in card, having passed the description, ending line points
448
            # to the line that follows description
449

450
        num_of_opens += len(re.findall(r"[({[]", all_lines[ending_line]))
1✔
451
        num_of_closes += len(re.findall(r"[)}\]]", all_lines[ending_line]))
1✔
452
        if num_of_closes == num_of_opens:
1✔
453
            break
1✔
454

455
    if num_of_closes != num_of_opens:
1✔
456
        raise ValueError(
457
            "input lines were exhausted before the matching close is found"
458
        )
459

460
    return (starting_line, ending_line)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc