• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

IBM / unitxt / 15805318527

22 Jun 2025 09:48AM UTC coverage: 79.887% (-0.3%) from 80.211%
15805318527

Pull #1811

github

web-flow
Merge 00f9eb5f5 into 9aa85a9a0
Pull Request #1811: Add multi turn tool calling task and support multiple tools per call

1698 of 2104 branches covered (80.7%)

Branch coverage included in aggregate %.

10563 of 13244 relevant lines covered (79.76%)

0.8 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.89
src/unitxt/struct_data_operators.py
1
"""This section describes unitxt operators for structured data.
2

3
These operators are specialized in handling structured data like tables.
4
For tables, expected input format is:
5

6
.. code-block:: text
7

8
    {
9
        "header": ["col1", "col2"],
10
        "rows": [["row11", "row12"], ["row21", "row22"], ["row31", "row32"]]
11
    }
12

13
For triples, expected input format is:
14

15
.. code-block:: text
16

17
    [[ "subject1", "relation1", "object1" ], [ "subject1", "relation2", "object2"]]
18

19
For key-value pairs, expected input format is:
20

21
.. code-block:: text
22

23
    {"key1": "value1", "key2": value2, "key3": "value3"}
24
"""
25

26
import ast
1✔
27
import json
1✔
28
import random
1✔
29
from abc import ABC, abstractmethod
1✔
30
from typing import (
1✔
31
    Any,
32
    Dict,
33
    List,
34
    Optional,
35
    Tuple,
36
)
37

38
import pandas as pd
1✔
39

40
from .augmentors import TypeDependentAugmentor
1✔
41
from .dict_utils import dict_get
1✔
42
from .error_utils import UnitxtWarning
1✔
43
from .operators import FieldOperator, InstanceOperator
1✔
44
from .random_utils import new_random_generator
1✔
45
from .serializers import ImageSerializer, TableSerializer
1✔
46
from .type_utils import isoftype
1✔
47
from .types import Table, ToolCall
1✔
48
from .utils import recursive_copy
1✔
49

50

51
def shuffle_columns(table: Table, seed=0) -> Table:
1✔
52
    # extract header & rows from the dictionary
53
    header = table.get("header", [])
1✔
54
    rows = table.get("rows", [])
1✔
55
    # shuffle the indices first
56
    indices = list(range(len(header)))
1✔
57
    random_generator = new_random_generator({"table": table, "seed": seed})
1✔
58
    random_generator.shuffle(indices)
1✔
59

60
    # shuffle the header & rows based on that indices
61
    shuffled_header = [header[i] for i in indices]
1✔
62
    shuffled_rows = [[row[i] for i in indices] for row in rows]
1✔
63

64
    table["header"] = shuffled_header
1✔
65
    table["rows"] = shuffled_rows
1✔
66

67
    return table
1✔
68

69

70
def shuffle_rows(table: Table, seed=0) -> Table:
1✔
71
    # extract header & rows from the dictionary
72
    rows = table.get("rows", [])
1✔
73
    # shuffle rows
74
    random_generator = new_random_generator({"table": table, "seed": seed})
1✔
75
    random_generator.shuffle(rows)
1✔
76
    table["rows"] = rows
1✔
77

78
    return table
1✔
79

80

81
class SerializeTable(ABC, TableSerializer):
1✔
82
    """TableSerializer converts a given table into a flat sequence with special symbols.
83

84
    Output format varies depending on the chosen serializer. This abstract class defines structure of a typical table serializer that any concrete implementation should follow.
85
    """
86

87
    seed: int = 0
1✔
88
    shuffle_rows: bool = False
1✔
89
    shuffle_columns: bool = False
1✔
90

91
    def serialize(self, value: Table, instance: Dict[str, Any]) -> str:
1✔
92
        value = recursive_copy(value)
1✔
93
        if self.shuffle_columns:
1✔
94
            value = shuffle_columns(table=value, seed=self.seed)
1✔
95

96
        if self.shuffle_rows:
1✔
97
            value = shuffle_rows(table=value, seed=self.seed)
1✔
98

99
        return self.serialize_table(value)
1✔
100

101
    # main method to serialize a table
102
    @abstractmethod
1✔
103
    def serialize_table(self, table_content: Dict) -> str:
1✔
104
        pass
×
105

106
    # method to process table header
107
    def process_header(self, header: List):
1✔
108
        pass
×
109

110
    # method to process a table row
111
    def process_row(self, row: List, row_index: int):
1✔
112
        pass
×
113

114

115
# Concrete classes implementing table serializers
116
class SerializeTableAsIndexedRowMajor(SerializeTable):
1✔
117
    """Indexed Row Major Table Serializer.
118

119
    Commonly used row major serialization format.
120
    Format:  col : col1 | col2 | col 3 row 1 : val1 | val2 | val3 | val4 row 2 : val1 | ...
121
    """
122

123
    # main method that processes a table
124
    # table_content must be in the presribed input format
125
    def serialize_table(self, table_content: Dict) -> str:
1✔
126
        # Extract headers and rows from the dictionary
127
        header = table_content.get("header", [])
1✔
128
        rows = table_content.get("rows", [])
1✔
129

130
        assert header and rows, "Incorrect input table format"
1✔
131

132
        # Process table header first
133
        serialized_tbl_str = self.process_header(header) + " "
1✔
134

135
        # Process rows sequentially starting from row 1
136
        for i, row in enumerate(rows, start=1):
1✔
137
            serialized_tbl_str += self.process_row(row, row_index=i) + " "
1✔
138

139
        # return serialized table as a string
140
        return serialized_tbl_str.strip()
1✔
141

142
    # serialize header into a string containing the list of column names separated by '|' symbol
143
    def process_header(self, header: List):
1✔
144
        return "col : " + " | ".join(header)
1✔
145

146
    # serialize a table row into a string containing the list of cell values separated by '|'
147
    def process_row(self, row: List, row_index: int):
1✔
148
        serialized_row_str = ""
1✔
149
        row_cell_values = [
1✔
150
            str(value) if isinstance(value, (int, float)) else value for value in row
151
        ]
152
        serialized_row_str += " | ".join([str(value) for value in row_cell_values])
1✔
153

154
        return f"row {row_index} : {serialized_row_str}"
1✔
155

156

157
class SerializeTableAsMarkdown(SerializeTable):
1✔
158
    """Markdown Table Serializer.
159

160
    Markdown table format is used in GitHub code primarily.
161
    Format:
162

163
    .. code-block:: text
164

165
        |col1|col2|col3|
166
        |---|---|---|
167
        |A|4|1|
168
        |I|2|1|
169
        ...
170

171
    """
172

173
    # main method that serializes a table.
174
    # table_content must be in the presribed input format.
175
    def serialize_table(self, table_content: Dict) -> str:
1✔
176
        # Extract headers and rows from the dictionary
177
        header = table_content.get("header", [])
1✔
178
        rows = table_content.get("rows", [])
1✔
179

180
        assert header and rows, "Incorrect input table format"
1✔
181

182
        # Process table header first
183
        serialized_tbl_str = self.process_header(header)
1✔
184

185
        # Process rows sequentially starting from row 1
186
        for i, row in enumerate(rows, start=1):
1✔
187
            serialized_tbl_str += self.process_row(row, row_index=i)
1✔
188

189
        # return serialized table as a string
190
        return serialized_tbl_str.strip()
1✔
191

192
    # serialize header into a string containing the list of column names
193
    def process_header(self, header: List):
1✔
194
        header_str = "|{}|\n".format("|".join(header))
1✔
195
        header_str += "|{}|\n".format("|".join(["---"] * len(header)))
1✔
196
        return header_str
1✔
197

198
    # serialize a table row into a string containing the list of cell values
199
    def process_row(self, row: List, row_index: int):
1✔
200
        row_str = ""
1✔
201
        row_str += "|{}|\n".format("|".join(str(cell) for cell in row))
1✔
202
        return row_str
1✔
203

204

205
class SerializeTableAsDFLoader(SerializeTable):
1✔
206
    """DFLoader Table Serializer.
207

208
    Pandas dataframe based code snippet format serializer.
209
    Format(Sample):
210

211
    .. code-block:: python
212

213
        pd.DataFrame({
214
            "name" : ["Alex", "Diana", "Donald"],
215
            "age" : [26, 34, 39]
216
        },
217
        index=[0,1,2])
218
    """
219

220
    # main method that serializes a table.
221
    # table_content must be in the presribed input format.
222
    def serialize_table(self, table_content: Dict) -> str:
1✔
223
        # Extract headers and rows from the dictionary
224
        header = table_content.get("header", [])
1✔
225
        rows = table_content.get("rows", [])
1✔
226

227
        assert header and rows, "Incorrect input table format"
1✔
228

229
        # Fix duplicate columns, ensuring the first occurrence has no suffix
230
        header = [
1✔
231
            f"{col}_{header[:i].count(col)}" if header[:i].count(col) > 0 else col
232
            for i, col in enumerate(header)
233
        ]
234

235
        # Create a pandas DataFrame
236
        df = pd.DataFrame(rows, columns=header)
1✔
237

238
        # Generate output string in the desired format
239
        data_dict = df.to_dict(orient="list")
1✔
240

241
        return (
1✔
242
            "pd.DataFrame({\n"
243
            + json.dumps(data_dict)[1:-1]
244
            + "},\nindex="
245
            + str(list(range(len(rows))))
246
            + ")"
247
        )
248

249

250
class SerializeTableAsJson(SerializeTable):
1✔
251
    """JSON Table Serializer.
252

253
    Json format based serializer.
254
    Format(Sample):
255

256
    .. code-block:: json
257

258
        {
259
            "0":{"name":"Alex","age":26},
260
            "1":{"name":"Diana","age":34},
261
            "2":{"name":"Donald","age":39}
262
        }
263
    """
264

265
    # main method that serializes a table.
266
    # table_content must be in the presribed input format.
267
    def serialize_table(self, table_content: Dict) -> str:
1✔
268
        # Extract headers and rows from the dictionary
269
        header = table_content.get("header", [])
1✔
270
        rows = table_content.get("rows", [])
1✔
271

272
        assert header and rows, "Incorrect input table format"
1✔
273

274
        # Generate output dictionary
275
        output_dict = {}
1✔
276
        for i, row in enumerate(rows):
1✔
277
            output_dict[i] = {header[j]: value for j, value in enumerate(row)}
1✔
278

279
        # Convert dictionary to JSON string
280
        return json.dumps(output_dict)
1✔
281

282

283
class SerializeTableAsHTML(SerializeTable):
1✔
284
    """HTML Table Serializer.
285

286
    HTML table format used for rendering tables in web pages.
287
    Format(Sample):
288

289
    .. code-block:: html
290

291
        <table>
292
            <thead>
293
                <tr><th>name</th><th>age</th><th>sex</th></tr>
294
            </thead>
295
            <tbody>
296
                <tr><td>Alice</td><td>26</td><td>F</td></tr>
297
                <tr><td>Raj</td><td>34</td><td>M</td></tr>
298
            </tbody>
299
        </table>
300
    """
301

302
    # main method that serializes a table.
303
    # table_content must be in the prescribed input format.
304
    def serialize_table(self, table_content: Dict) -> str:
1✔
305
        # Extract headers and rows from the dictionary
306
        header = table_content.get("header", [])
1✔
307
        rows = table_content.get("rows", [])
1✔
308

309
        assert header and rows, "Incorrect input table format"
1✔
310

311
        # Build the HTML table structure
312
        serialized_tbl_str = "<table>\n"
1✔
313
        serialized_tbl_str += self.process_header(header) + "\n"
1✔
314
        serialized_tbl_str += self.process_rows(rows) + "\n"
1✔
315
        serialized_tbl_str += "</table>"
1✔
316

317
        return serialized_tbl_str.strip()
1✔
318

319
    # serialize the header into an HTML <thead> section
320
    def process_header(self, header: List) -> str:
1✔
321
        header_html = "  <thead>\n    <tr>"
1✔
322
        for col in header:
1✔
323
            header_html += f"<th>{col}</th>"
1✔
324
        header_html += "</tr>\n  </thead>"
1✔
325
        return header_html
1✔
326

327
    # serialize the rows into an HTML <tbody> section
328
    def process_rows(self, rows: List[List]) -> str:
1✔
329
        rows_html = "  <tbody>"
1✔
330
        for row in rows:
1✔
331
            rows_html += "\n    <tr>"
1✔
332
            for cell in row:
1✔
333
                rows_html += f"<td>{cell}</td>"
1✔
334
            rows_html += "</tr>"
1✔
335
        rows_html += "\n  </tbody>"
1✔
336
        return rows_html
1✔
337

338

339
class SerializeTableAsConcatenation(SerializeTable):
1✔
340
    """Concat Serializer.
341

342
    Concat all table content to one string of header and rows.
343
    Format(Sample):
344
    name age Alex 26 Diana 34
345
    """
346

347
    def serialize_table(self, table_content: Dict) -> str:
1✔
348
        # Extract headers and rows from the dictionary
349
        header = table_content["header"]
×
350
        rows = table_content["rows"]
×
351

352
        assert header and rows, "Incorrect input table format"
×
353

354
        # Process table header first
355
        serialized_tbl_str = " ".join([str(i) for i in header])
×
356

357
        # Process rows sequentially starting from row 1
358
        for row in rows:
×
359
            serialized_tbl_str += " " + " ".join([str(i) for i in row])
×
360

361
        # return serialized table as a string
362
        return serialized_tbl_str.strip()
×
363

364

365
class SerializeTableAsImage(SerializeTable):
1✔
366
    _requirements_list = ["matplotlib", "pillow"]
1✔
367

368
    def serialize_table(self, table_content: Dict) -> str:
1✔
369
        raise NotImplementedError()
×
370

371
    def serialize(self, value: Table, instance: Dict[str, Any]) -> str:
1✔
372
        table_content = recursive_copy(value)
×
373
        if self.shuffle_columns:
×
374
            table_content = shuffle_columns(table=table_content, seed=self.seed)
×
375

376
        if self.shuffle_rows:
×
377
            table_content = shuffle_rows(table=table_content, seed=self.seed)
×
378

379
        import io
×
380

381
        import matplotlib.pyplot as plt
×
382
        import pandas as pd
×
383
        from PIL import Image
×
384

385
        # Extract headers and rows from the dictionary
386
        header = table_content.get("header", [])
×
387
        rows = table_content.get("rows", [])
×
388

389
        assert header and rows, "Incorrect input table format"
×
390

391
        # Fix duplicate columns, ensuring the first occurrence has no suffix
392
        header = [
×
393
            f"{col}_{header[:i].count(col)}" if header[:i].count(col) > 0 else col
394
            for i, col in enumerate(header)
395
        ]
396

397
        # Create a pandas DataFrame
398
        df = pd.DataFrame(rows, columns=header)
×
399

400
        # Fix duplicate columns, ensuring the first occurrence has no suffix
401
        df.columns = [
×
402
            f"{col}_{i}" if df.columns.duplicated()[i] else col
403
            for i, col in enumerate(df.columns)
404
        ]
405

406
        # Create a matplotlib table
407
        plt.rcParams["font.family"] = "Serif"
×
408
        fig, ax = plt.subplots(figsize=(len(header) * 1.5, len(rows) * 0.5))
×
409
        ax.axis("off")  # Turn off the axes
×
410

411
        table = pd.plotting.table(ax, df, loc="center", cellLoc="center")
×
412
        table.auto_set_column_width(col=range(len(df.columns)))
×
413
        table.scale(1.5, 1.5)
×
414

415
        # Save the plot to a BytesIO buffer
416
        buf = io.BytesIO()
×
417
        plt.savefig(buf, format="png", bbox_inches="tight", dpi=150)
×
418
        plt.close(fig)  # Close the figure to free up memory
×
419
        buf.seek(0)
×
420

421
        # Load the image from the buffer using PIL
422
        image = Image.open(buf)
×
423
        return ImageSerializer().serialize({"image": image, "format": "png"}, instance)
×
424

425

426
# truncate cell value to maximum allowed length
427
def truncate_cell(cell_value, max_len):
1✔
428
    if cell_value is None:
1✔
429
        return None
×
430

431
    if isinstance(cell_value, int) or isinstance(cell_value, float):
1✔
432
        return None
×
433

434
    if cell_value.strip() == "":
1✔
435
        return None
×
436

437
    if len(cell_value) > max_len:
1✔
438
        return cell_value[:max_len]
1✔
439

440
    return None
1✔
441

442

443
class TruncateTableCells(InstanceOperator):
1✔
444
    """Limit the maximum length of cell values in a table to reduce the overall length.
445

446
    Args:
447
        max_length (int) - maximum allowed length of cell values
448
        For tasks that produce a cell value as answer, truncating a cell value should be replicated
449
        with truncating the corresponding answer as well. This has been addressed in the implementation.
450

451
    """
452

453
    max_length: int = 15
1✔
454
    table: str = None
1✔
455
    text_output: Optional[str] = None
1✔
456

457
    def process(
1✔
458
        self, instance: Dict[str, Any], stream_name: Optional[str] = None
459
    ) -> Dict[str, Any]:
460
        table = dict_get(instance, self.table)
1✔
461

462
        answers = []
1✔
463
        if self.text_output is not None:
1✔
464
            answers = dict_get(instance, self.text_output)
×
465

466
        self.truncate_table(table_content=table, answers=answers)
1✔
467

468
        return instance
1✔
469

470
    # truncate table cells
471
    def truncate_table(self, table_content: Dict, answers: Optional[List]):
1✔
472
        cell_mapping = {}
1✔
473

474
        # One row at a time
475
        for row in table_content.get("rows", []):
1✔
476
            for i, cell in enumerate(row):
1✔
477
                truncated_cell = truncate_cell(cell, self.max_length)
1✔
478
                if truncated_cell is not None:
1✔
479
                    cell_mapping[cell] = truncated_cell
1✔
480
                    row[i] = truncated_cell
1✔
481

482
        # Update values in answer list to truncated values
483
        if answers is not None:
1✔
484
            for i, case in enumerate(answers):
1✔
485
                answers[i] = cell_mapping.get(case, case)
×
486

487

488
class TruncateTableRows(FieldOperator):
1✔
489
    """Limits table rows to specified limit by removing excess rows via random selection.
490

491
    Args:
492
        rows_to_keep (int): number of rows to keep.
493
    """
494

495
    rows_to_keep: int = 10
1✔
496

497
    def process_value(self, table: Any) -> Any:
1✔
498
        return self.truncate_table_rows(table_content=table)
1✔
499

500
    def truncate_table_rows(self, table_content: Dict):
1✔
501
        # Get rows from table
502
        rows = table_content.get("rows", [])
1✔
503

504
        num_rows = len(rows)
1✔
505

506
        # if number of rows are anyway lesser, return.
507
        if num_rows <= self.rows_to_keep:
1✔
508
            return table_content
×
509

510
        # calculate number of rows to delete, delete them
511
        rows_to_delete = num_rows - self.rows_to_keep
1✔
512

513
        # Randomly select rows to be deleted
514
        deleted_rows_indices = random.sample(range(len(rows)), rows_to_delete)
1✔
515

516
        remaining_rows = [
1✔
517
            row for i, row in enumerate(rows) if i not in deleted_rows_indices
518
        ]
519
        table_content["rows"] = remaining_rows
1✔
520

521
        return table_content
1✔
522

523

524
class GetNumOfTableCells(FieldOperator):
1✔
525
    """Get the number of cells in the given table."""
526

527
    def process_value(self, table: Any) -> Any:
1✔
528
        num_of_rows = len(table.get("rows"))
×
529
        num_of_cols = len(table.get("header"))
×
530
        return num_of_rows * num_of_cols
×
531

532

533
class SerializeTableRowAsText(InstanceOperator):
1✔
534
    """Serializes a table row as text.
535

536
    Args:
537
        fields (str) - list of fields to be included in serialization.
538
        to_field (str) - serialized text field name.
539
        max_cell_length (int) - limits cell length to be considered, optional.
540
    """
541

542
    fields: str
1✔
543
    to_field: str
1✔
544
    max_cell_length: Optional[int] = None
1✔
545

546
    def process(
1✔
547
        self, instance: Dict[str, Any], stream_name: Optional[str] = None
548
    ) -> Dict[str, Any]:
549
        linearized_str = ""
1✔
550
        for field in self.fields:
1✔
551
            value = dict_get(instance, field)
1✔
552
            if self.max_cell_length is not None:
1✔
553
                truncated_value = truncate_cell(value, self.max_cell_length)
1✔
554
                if truncated_value is not None:
1✔
555
                    value = truncated_value
×
556

557
            linearized_str = linearized_str + field + " is " + str(value) + ", "
1✔
558

559
        instance[self.to_field] = linearized_str
1✔
560
        return instance
1✔
561

562

563
class SerializeTableRowAsList(InstanceOperator):
1✔
564
    """Serializes a table row as list.
565

566
    Args:
567
        fields (str) - list of fields to be included in serialization.
568
        to_field (str) - serialized text field name.
569
        max_cell_length (int) - limits cell length to be considered, optional.
570
    """
571

572
    fields: str
1✔
573
    to_field: str
1✔
574
    max_cell_length: Optional[int] = None
1✔
575

576
    def process(
1✔
577
        self, instance: Dict[str, Any], stream_name: Optional[str] = None
578
    ) -> Dict[str, Any]:
579
        linearized_str = ""
1✔
580
        for field in self.fields:
1✔
581
            value = dict_get(instance, field)
1✔
582
            if self.max_cell_length is not None:
1✔
583
                truncated_value = truncate_cell(value, self.max_cell_length)
1✔
584
                if truncated_value is not None:
1✔
585
                    value = truncated_value
×
586

587
            linearized_str = linearized_str + field + ": " + str(value) + ", "
1✔
588

589
        instance[self.to_field] = linearized_str
1✔
590
        return instance
1✔
591

592

593
class SerializeTriples(FieldOperator):
1✔
594
    """Serializes triples into a flat sequence.
595

596
    Sample input in expected format:
597
    [[ "First Clearing", "LOCATION", "On NYS 52 1 Mi. Youngsville" ], [ "On NYS 52 1 Mi. Youngsville", "CITY_OR_TOWN", "Callicoon, New York"]]
598

599
    Sample output:
600
    First Clearing : LOCATION : On NYS 52 1 Mi. Youngsville | On NYS 52 1 Mi. Youngsville : CITY_OR_TOWN : Callicoon, New York
601

602
    """
603

604
    def process_value(self, tripleset: Any) -> Any:
1✔
605
        return self.serialize_triples(tripleset)
1✔
606

607
    def serialize_triples(self, tripleset) -> str:
1✔
608
        return " | ".join(
1✔
609
            f"{subj} : {rel.lower()} : {obj}" for subj, rel, obj in tripleset
610
        )
611

612

613
class SerializeKeyValPairs(FieldOperator):
1✔
614
    """Serializes key, value pairs into a flat sequence.
615

616
    Sample input in expected format: {"name": "Alex", "age": 31, "sex": "M"}
617
    Sample output: name is Alex, age is 31, sex is M
618
    """
619

620
    def process_value(self, kvpairs: Any) -> Any:
1✔
621
        return self.serialize_kvpairs(kvpairs)
1✔
622

623
    def serialize_kvpairs(self, kvpairs) -> str:
1✔
624
        serialized_str = ""
1✔
625
        for key, value in kvpairs.items():
1✔
626
            serialized_str += f"{key} is {value}, "
1✔
627

628
        # Remove the trailing comma and space then return
629
        return serialized_str[:-2]
1✔
630

631

632
class ListToKeyValPairs(InstanceOperator):
1✔
633
    """Maps list of keys and values into key:value pairs.
634

635
    Sample input in expected format: {"keys": ["name", "age", "sex"], "values": ["Alex", 31, "M"]}
636
    Sample output: {"name": "Alex", "age": 31, "sex": "M"}
637
    """
638

639
    fields: List[str]
1✔
640
    to_field: str
1✔
641

642
    def process(
1✔
643
        self, instance: Dict[str, Any], stream_name: Optional[str] = None
644
    ) -> Dict[str, Any]:
645
        keylist = dict_get(instance, self.fields[0])
1✔
646
        valuelist = dict_get(instance, self.fields[1])
1✔
647

648
        output_dict = {}
1✔
649
        for key, value in zip(keylist, valuelist):
1✔
650
            output_dict[key] = value
1✔
651

652
        instance[self.to_field] = output_dict
1✔
653

654
        return instance
1✔
655

656

657
class ConvertTableColNamesToSequential(FieldOperator):
1✔
658
    """Replaces actual table column names with static sequential names like col_0, col_1,...
659

660
    .. code-block:: text
661

662
        Sample input:
663
        {
664
            "header": ["name", "age"],
665
            "rows": [["Alex", 21], ["Donald", 34]]
666
        }
667

668
        Sample output:
669
        {
670
            "header": ["col_0", "col_1"],
671
            "rows": [["Alex", 21], ["Donald", 34]]
672
        }
673
    """
674

675
    def process_value(self, table: Any) -> Any:
1✔
676
        table_input = recursive_copy(table)
1✔
677
        return self.replace_header(table_content=table_input)
1✔
678

679
    # replaces header with sequential column names
680
    def replace_header(self, table_content: Dict) -> str:
1✔
681
        # Extract header from the dictionary
682
        header = table_content.get("header", [])
1✔
683

684
        assert header, "Input table missing header"
1✔
685

686
        new_header = ["col_" + str(i) for i in range(len(header))]
1✔
687
        table_content["header"] = new_header
1✔
688

689
        return table_content
1✔
690

691

692
class ShuffleTableRows(TypeDependentAugmentor):
1✔
693
    """Shuffles the input table rows randomly.
694

695
    .. code-block:: text
696

697
        Sample Input:
698
        {
699
            "header": ["name", "age"],
700
            "rows": [["Alex", 26], ["Raj", 34], ["Donald", 39]],
701
        }
702

703
        Sample Output:
704
        {
705
            "header": ["name", "age"],
706
            "rows": [["Donald", 39], ["Raj", 34], ["Alex", 26]],
707
        }
708
    """
709

710
    augmented_type = Table
1✔
711
    seed = 0
1✔
712

713
    def process_value(self, table: Any) -> Any:
1✔
714
        table_input = recursive_copy(table)
1✔
715
        return shuffle_rows(table_input, self.seed)
1✔
716

717

718
class ShuffleTableColumns(TypeDependentAugmentor):
1✔
719
    """Shuffles the table columns randomly.
720

721
    .. code-block:: text
722

723
        Sample Input:
724
            {
725
                "header": ["name", "age"],
726
                "rows": [["Alex", 26], ["Raj", 34], ["Donald", 39]],
727
            }
728

729
        Sample Output:
730
            {
731
                "header": ["age", "name"],
732
                "rows": [[26, "Alex"], [34, "Raj"], [39, "Donald"]],
733
            }
734
    """
735

736
    augmented_type = Table
1✔
737
    seed = 0
1✔
738

739
    def process_value(self, table: Any) -> Any:
1✔
740
        table_input = recursive_copy(table)
1✔
741
        return shuffle_columns(table_input, self.seed)
1✔
742

743

744
class LoadJson(FieldOperator):
1✔
745
    failure_value: Any = None
1✔
746
    allow_failure: bool = False
1✔
747

748
    def process_value(self, value: str) -> Any:
1✔
749
        if self.allow_failure:
1✔
750
            try:
1✔
751
                return json.loads(value)
1✔
752
            except json.JSONDecodeError:
1✔
753
                return self.failure_value
1✔
754
        else:
755
            return json.loads(value, strict=False)
1✔
756

757

758
class PythonCallProcessor(FieldOperator):
1✔
759
    def process_value(self, value: str) -> ToolCall:
1✔
760
        expr = ast.parse(value, mode="eval").body
×
761
        function = expr.func.id
×
762
        args = {}
×
763
        for kw in expr.keywords:
×
764
            args[kw.arg] = ast.literal_eval(kw.value)
×
765
        # Handle positional args, if any
766
        if expr.args:
×
767
            args["_args"] = [ast.literal_eval(arg) for arg in expr.args]
×
768
        return {"name": function, "arguments": args}
×
769

770

771
def extract_possible_json_str(text):
1✔
772
    """Extract potential JSON string from text by finding outermost braces/brackets."""
773
    # Find first opening delimiter
774
    start_positions = [pos for pos in [text.find("{"), text.find("[")] if pos != -1]
×
775
    start = min(start_positions) if start_positions else 0
×
776

777
    # Find last closing delimiter
778
    end_positions = [pos for pos in [text.rfind("}"), text.rfind("]")] if pos != -1]
×
779
    end = max(end_positions) if end_positions else len(text) - 1
×
780

781
    return text[start : end + 1]
×
782

783

784
class ToolCallPostProcessor(FieldOperator):
1✔
785
    failure_value: Any = None
1✔
786
    allow_failure: bool = False
1✔
787

788
    def process_value(self, value: str) -> ToolCall:
1✔
789
        value = extract_possible_json_str(
×
790
            value
791
        )  # clear tokens such as <tool_call> focusing on the call json itself
792
        if self.allow_failure:
×
793
            try:
×
794
                result = json.loads(value)
×
795
            except json.JSONDecodeError:
×
796
                return self.failure_value
×
797
        else:
798
            result = json.loads(value, strict=False)
×
799
        if isoftype(result, List[ToolCall]):
×
800
            if len(result) > 1:
×
801
                UnitxtWarning(f"More than one tool returned from model: {result}")
×
802
                return self.failure_value
×
803
            return result[0]
×
804
        if not isoftype(result, ToolCall):
×
805
            return self.failure_value
×
806
        return result
×
807

808

809
class MultipleToolCallPostProcessor(FieldOperator):
1✔
810
    failure_value: Any = None
1✔
811
    allow_failure: bool = False
1✔
812

813
    def process_value(self, value: str) -> List[ToolCall]:
1✔
814
        if self.allow_failure:
×
815
            try:
×
816
                result = json.loads(value)
×
817
            except json.JSONDecodeError:
×
818
                return self.failure_value
×
819
        else:
820
            result = json.loads(value, strict=False)
×
821
        if isoftype(result, List[ToolCall]):
×
822
            return result
×
823
        if not isoftype(result, ToolCall):
×
824
            return self.failure_value
×
825
        return [result]
×
826

827

828
class DumpJson(FieldOperator):
1✔
829
    def process_value(self, value: str) -> str:
1✔
830
        return json.dumps(value)
1✔
831

832

833
class MapHTMLTableToJSON(FieldOperator):
1✔
834
    """Converts HTML table format to the basic one (JSON).
835

836
    JSON format:
837

838
    .. code-block:: json
839

840
        {
841
            "header": ["col1", "col2"],
842
            "rows": [["row11", "row12"], ["row21", "row22"], ["row31", "row32"]]
843
        }
844
    """
845

846
    _requirements_list = ["bs4"]
1✔
847

848
    def process_value(self, table: Any) -> Any:
1✔
849
        return self.convert_to_json(table_content=table)
1✔
850

851
    def convert_to_json(self, table_content: str) -> Dict:
1✔
852
        from bs4 import BeautifulSoup
1✔
853

854
        soup = BeautifulSoup(table_content, "html.parser")
1✔
855

856
        # Extract header
857
        header = []
1✔
858
        header_cells = soup.find("thead").find_all("th")
1✔
859
        for cell in header_cells:
1✔
860
            header.append(cell.get_text())
1✔
861

862
        # Extract rows
863
        rows = []
1✔
864
        for row in soup.find("tbody").find_all("tr"):
1✔
865
            row_data = []
1✔
866
            for cell in row.find_all("td"):
1✔
867
                row_data.append(cell.get_text())
1✔
868
            rows.append(row_data)
1✔
869

870
        # return dictionary
871

872
        return {"header": header, "rows": rows}
1✔
873

874

875
class MapTableListsToStdTableJSON(FieldOperator):
1✔
876
    """Converts lists table format to the basic one (JSON).
877

878
    JSON format:
879

880
    .. code-block:: json
881

882
        {
883
            "header": ["col1", "col2"],
884
            "rows": [["row11", "row12"], ["row21", "row22"], ["row31", "row32"]]
885
        }
886
    """
887

888
    def process_value(self, table: Any) -> Any:
1✔
889
        return self.map_tablelists_to_stdtablejson_util(table_content=table)
×
890

891
    def map_tablelists_to_stdtablejson_util(self, table_content: str) -> Dict:
1✔
892
        return {"header": table_content[0], "rows": table_content[1:]}
×
893

894

895
class ConstructTableFromRowsCols(InstanceOperator):
1✔
896
    """Maps column and row field into single table field encompassing both header and rows.
897

898
    field[0] = header string as List
899
    field[1] = rows string as List[List]
900
    field[2] = table caption string(optional)
901
    """
902

903
    fields: List[str]
1✔
904
    to_field: str
1✔
905

906
    def process(
1✔
907
        self, instance: Dict[str, Any], stream_name: Optional[str] = None
908
    ) -> Dict[str, Any]:
909
        header = dict_get(instance, self.fields[0])
×
910
        rows = dict_get(instance, self.fields[1])
×
911

912
        if len(self.fields) >= 3:
×
913
            caption = instance[self.fields[2]]
×
914
        else:
915
            caption = None
×
916

917
        import ast
×
918

919
        header_processed = ast.literal_eval(header)
×
920
        rows_processed = ast.literal_eval(rows)
×
921

922
        output_dict = {"header": header_processed, "rows": rows_processed}
×
923

924
        if caption is not None:
×
925
            output_dict["caption"] = caption
×
926

927
        instance[self.to_field] = output_dict
×
928

929
        return instance
×
930

931

932
class TransposeTable(TypeDependentAugmentor):
1✔
933
    """Transpose a table.
934

935
    .. code-block:: text
936

937
        Sample Input:
938
            {
939
                "header": ["name", "age", "sex"],
940
                "rows": [["Alice", 26, "F"], ["Raj", 34, "M"], ["Donald", 39, "M"]],
941
            }
942

943
        Sample Output:
944
            {
945
                "header": [" ", "0", "1", "2"],
946
                "rows": [["name", "Alice", "Raj", "Donald"], ["age", 26, 34, 39], ["sex", "F", "M", "M"]],
947
            }
948

949
    """
950

951
    augmented_type = Table
1✔
952

953
    def process_value(self, table: Any) -> Any:
1✔
954
        return self.transpose_table(table)
1✔
955

956
    def transpose_table(self, table: Dict) -> Dict:
1✔
957
        # Extract the header and rows from the table object
958
        header = table["header"]
1✔
959
        rows = table["rows"]
1✔
960

961
        # Transpose the table by converting rows as columns and vice versa
962
        transposed_header = [" "] + [str(i) for i in range(len(rows))]
1✔
963
        transposed_rows = [
1✔
964
            [header[i]] + [row[i] for row in rows] for i in range(len(header))
965
        ]
966

967
        return {"header": transposed_header, "rows": transposed_rows}
1✔
968

969

970
class DuplicateTableRows(TypeDependentAugmentor):
1✔
971
    """Duplicates specific rows of a table for the given number of times.
972

973
    Args:
974
        row_indices (List[int]): rows to be duplicated
975

976
        times(int): each row to be duplicated is to show that many times
977
    """
978

979
    augmented_type = Table
1✔
980

981
    row_indices: List[int] = []
1✔
982
    times: int = 1
1✔
983

984
    def process_value(self, table: Any) -> Any:
1✔
985
        # Extract the header and rows from the table
986
        header = table["header"]
1✔
987
        rows = table["rows"]
1✔
988

989
        # Duplicate only the specified rows
990
        duplicated_rows = []
1✔
991
        for i, row in enumerate(rows):
1✔
992
            if i in self.row_indices:
1✔
993
                duplicated_rows.extend(
1✔
994
                    [row] * self.times
995
                )  # Duplicate the selected rows
996
            else:
997
                duplicated_rows.append(row)  # Leave other rows unchanged
1✔
998

999
        # Return the new table with selectively duplicated rows
1000
        return {"header": header, "rows": duplicated_rows}
1✔
1001

1002

1003
class DuplicateTableColumns(TypeDependentAugmentor):
1✔
1004
    """Duplicates specific columns of a table for the given number of times.
1005

1006
    Args:
1007
        column_indices (List[int]): columns to be duplicated
1008

1009
        times(int): each column to be duplicated is to show that many times
1010
    """
1011

1012
    augmented_type = Table
1✔
1013

1014
    column_indices: List[int] = []
1✔
1015
    times: int = 1
1✔
1016

1017
    def process_value(self, table: Any) -> Any:
1✔
1018
        # Extract the header and rows from the table
1019
        header = table["header"]
1✔
1020
        rows = table["rows"]
1✔
1021

1022
        # Duplicate the specified columns in the header
1023
        duplicated_header = []
1✔
1024
        for i, col in enumerate(header):
1✔
1025
            if i in self.column_indices:
1✔
1026
                duplicated_header.extend([col] * self.times)
1✔
1027
            else:
1028
                duplicated_header.append(col)
1✔
1029

1030
        # Duplicate the specified columns in each row
1031
        duplicated_rows = []
1✔
1032
        for row in rows:
1✔
1033
            new_row = []
1✔
1034
            for i, value in enumerate(row):
1✔
1035
                if i in self.column_indices:
1✔
1036
                    new_row.extend([value] * self.times)
1✔
1037
                else:
1038
                    new_row.append(value)
1✔
1039
            duplicated_rows.append(new_row)
1✔
1040

1041
        # Return the new table with selectively duplicated columns
1042
        return {"header": duplicated_header, "rows": duplicated_rows}
1✔
1043

1044

1045
class InsertEmptyTableRows(TypeDependentAugmentor):
1✔
1046
    """Inserts empty rows in a table randomly for the given number of times.
1047

1048
    Args:
1049
        times(int) - how many times to insert
1050
    """
1051

1052
    augmented_type = Table
1✔
1053

1054
    times: int = 0
1✔
1055

1056
    def process_value(self, table: Any) -> Any:
1✔
1057
        # Extract the header and rows from the table
1058
        header = table["header"]
1✔
1059
        rows = table["rows"]
1✔
1060

1061
        # Insert empty rows at random positions
1062
        for _ in range(self.times):
1✔
1063
            empty_row = [""] * len(
1✔
1064
                header
1065
            )  # Create an empty row with the same number of columns
1066
            insert_pos = random.randint(
1✔
1067
                0, len(rows)
1068
            )  # Get a random position to insert the empty row created
1069
            rows.insert(insert_pos, empty_row)
1✔
1070

1071
        # Return the modified table
1072
        return {"header": header, "rows": rows}
1✔
1073

1074

1075
class MaskColumnsNames(TypeDependentAugmentor):
1✔
1076
    """Mask the names of tables columns with dummies "Col1", "Col2" etc."""
1077

1078
    augmented_type = Table
1✔
1079

1080
    def process_value(self, table: Any) -> Any:
1✔
1081
        masked_header = ["Col" + str(ind + 1) for ind in range(len(table["header"]))]
×
1082

1083
        return {"header": masked_header, "rows": table["rows"]}
×
1084

1085

1086
class ShuffleColumnsNames(TypeDependentAugmentor):
1✔
1087
    """Shuffle table columns names to be displayed in random order."""
1088

1089
    augmented_type = Table
1✔
1090

1091
    def process_value(self, table: Any) -> Any:
1✔
1092
        shuffled_header = table["header"]
×
1093
        random.shuffle(shuffled_header)
×
1094

1095
        return {"header": shuffled_header, "rows": table["rows"]}
×
1096

1097

1098
class JsonStrToDict(FieldOperator):
1✔
1099
    """Convert a Json string of representing key value as dictionary.
1100

1101
    Ensure keys and values are strings, and there are no None values.
1102

1103
    """
1104

1105
    def process_value(self, text: str) -> List[Tuple[str, str]]:
1✔
1106
        try:
1✔
1107
            dict_value = json.loads(text)
1✔
1108
        except Exception as e:
1✔
1109
            UnitxtWarning(
1✔
1110
                f"Unable to convert input text to json format in JsonStrToDict due to {e}. Text: {text}"
1111
            )
1112
            dict_value = {}
1✔
1113
        if not isoftype(dict_value, Dict[str, Any]):
1✔
1114
            UnitxtWarning(
1✔
1115
                f"Unable to convert input text to dictionary in JsonStrToDict. Text: {text}"
1116
            )
1117
            dict_value = {}
1✔
1118
        return {str(k): str(v) for k, v in dict_value.items() if v is not None}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc