• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

IBM / unitxt / 16719844294

04 Aug 2025 09:51AM UTC coverage: 81.089% (-0.1%) from 81.22%
16719844294

push

github

web-flow
Fix compatibility with datasets 4.0 (#1861)

* Fix compatibility with datasets 4.0

Signed-off-by: elronbandel <elronbandel@gmail.com>

* Fix type

Signed-off-by: elronbandel <elronbandel@gmail.com>

* Fix datasets

Signed-off-by: elronbandel <elronbandel@gmail.com>

* Finalize deprecation of trust_remote_code in datasets

Signed-off-by: elronbandel <elronbandel@gmail.com>

* Another fix

Signed-off-by: elronbandel <elronbandel@gmail.com>

* Update finqa

Signed-off-by: elronbandel <elronbandel@gmail.com>

* fix finqa to specific commit

Signed-off-by: elronbandel <elronbandel@gmail.com>

* Update banking, ethos and uner

Signed-off-by: elronbandel <elronbandel@gmail.com>

* fix dart, reuters, hotpotqa, bbq, legalbench

Signed-off-by: elronbandel <elronbandel@gmail.com>

* Fix wikitq

Signed-off-by: elronbandel <elronbandel@gmail.com>

* Fix tab fact

Signed-off-by: elronbandel <elronbandel@gmail.com>

* Fix multidoc2dial

Signed-off-by: elronbandel <elronbandel@gmail.com>

* Add missing catalog assets

Signed-off-by: elronbandel <elronbandel@gmail.com>

* Add tldr and make prep tests faster

Signed-off-by: elronbandel <elronbandel@gmail.com>

* Fix turl

Signed-off-by: elronbandel <elronbandel@gmail.com>

* Add backward compatability for lower datasets versions

Signed-off-by: elronbandel <elronbandel@gmail.com>

* Fix finqa

Signed-off-by: elronbandel <elronbandel@gmail.com>

* Fix wikitq

Signed-off-by: elronbandel <elronbandel@gmail.com>

* Fix tabfact and wikitq

Signed-off-by: elronbandel <elronbandel@gmail.com>

---------

Signed-off-by: elronbandel <elronbandel@gmail.com>

1591 of 1972 branches covered (80.68%)

Branch coverage included in aggregate %.

10754 of 13252 relevant lines covered (81.15%)

0.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.92
src/unitxt/struct_data_operators.py
1
"""This section describes unitxt operators for structured data.
2

3
These operators are specialized in handling structured data like tables.
4
For tables, expected input format is:
5

6
.. code-block:: text
7

8
    {
9
        "header": ["col1", "col2"],
10
        "rows": [["row11", "row12"], ["row21", "row22"], ["row31", "row32"]]
11
    }
12

13
For triples, expected input format is:
14

15
.. code-block:: text
16

17
    [[ "subject1", "relation1", "object1" ], [ "subject1", "relation2", "object2"]]
18

19
For key-value pairs, expected input format is:
20

21
.. code-block:: text
22

23
    {"key1": "value1", "key2": value2, "key3": "value3"}
24
"""
25

26
import ast
1✔
27
import csv
1✔
28
import io
1✔
29
import json
1✔
30
import random
1✔
31
from abc import ABC, abstractmethod
1✔
32
from typing import (
1✔
33
    Any,
34
    Dict,
35
    List,
36
    Literal,
37
    Optional,
38
    Tuple,
39
)
40

41
import pandas as pd
1✔
42

43
from .augmentors import TypeDependentAugmentor
1✔
44
from .dict_utils import dict_get
1✔
45
from .error_utils import UnitxtWarning
1✔
46
from .operators import FieldOperator, InstanceOperator
1✔
47
from .random_utils import new_random_generator
1✔
48
from .serializers import ImageSerializer, TableSerializer
1✔
49
from .type_utils import isoftype
1✔
50
from .types import Table, ToolCall
1✔
51
from .utils import recursive_copy
1✔
52

53

54
def shuffle_columns(table: Table, seed=0) -> Table:
1✔
55
    # extract header & rows from the dictionary
56
    header = table.get("header", [])
1✔
57
    rows = table.get("rows", [])
1✔
58
    # shuffle the indices first
59
    indices = list(range(len(header)))
1✔
60
    random_generator = new_random_generator({"table": table, "seed": seed})
1✔
61
    random_generator.shuffle(indices)
1✔
62

63
    # shuffle the header & rows based on that indices
64
    shuffled_header = [header[i] for i in indices]
1✔
65
    shuffled_rows = [[row[i] for i in indices] for row in rows]
1✔
66

67
    table["header"] = shuffled_header
1✔
68
    table["rows"] = shuffled_rows
1✔
69

70
    return table
1✔
71

72

73
def shuffle_rows(table: Table, seed=0) -> Table:
1✔
74
    # extract header & rows from the dictionary
75
    rows = table.get("rows", [])
1✔
76
    # shuffle rows
77
    random_generator = new_random_generator({"table": table, "seed": seed})
1✔
78
    random_generator.shuffle(rows)
1✔
79
    table["rows"] = rows
1✔
80

81
    return table
1✔
82

83

84
class SerializeTable(ABC, TableSerializer):
1✔
85
    """TableSerializer converts a given table into a flat sequence with special symbols.
86

87
    Output format varies depending on the chosen serializer. This abstract class defines structure of a typical table serializer that any concrete implementation should follow.
88
    """
89

90
    seed: int = 0
1✔
91
    shuffle_rows: bool = False
1✔
92
    shuffle_columns: bool = False
1✔
93

94
    def serialize(self, value: Table, instance: Dict[str, Any]) -> str:
1✔
95
        value = recursive_copy(value)
1✔
96
        if self.shuffle_columns:
1✔
97
            value = shuffle_columns(table=value, seed=self.seed)
1✔
98

99
        if self.shuffle_rows:
1✔
100
            value = shuffle_rows(table=value, seed=self.seed)
1✔
101

102
        return self.serialize_table(value)
1✔
103

104
    # main method to serialize a table
105
    @abstractmethod
1✔
106
    def serialize_table(self, table_content: Dict) -> str:
1✔
107
        pass
108

109
    # method to process table header
110
    def process_header(self, header: List):
1✔
111
        pass
112

113
    # method to process a table row
114
    def process_row(self, row: List, row_index: int):
1✔
115
        pass
116

117

118
# Concrete classes implementing table serializers
119
class SerializeTableAsIndexedRowMajor(SerializeTable):
1✔
120
    """Indexed Row Major Table Serializer.
121

122
    Commonly used row major serialization format.
123
    Format:  col : col1 | col2 | col 3 row 1 : val1 | val2 | val3 | val4 row 2 : val1 | ...
124
    """
125

126
    # main method that processes a table
127
    # table_content must be in the presribed input format
128
    def serialize_table(self, table_content: Dict) -> str:
1✔
129
        # Extract headers and rows from the dictionary
130
        header = table_content.get("header", [])
1✔
131
        rows = table_content.get("rows", [])
1✔
132

133
        assert header and rows, "Incorrect input table format"
1✔
134

135
        # Process table header first
136
        serialized_tbl_str = self.process_header(header) + " "
1✔
137

138
        # Process rows sequentially starting from row 1
139
        for i, row in enumerate(rows, start=1):
1✔
140
            serialized_tbl_str += self.process_row(row, row_index=i) + " "
1✔
141

142
        # return serialized table as a string
143
        return serialized_tbl_str.strip()
1✔
144

145
    # serialize header into a string containing the list of column names separated by '|' symbol
146
    def process_header(self, header: List):
1✔
147
        return "col : " + " | ".join(header)
1✔
148

149
    # serialize a table row into a string containing the list of cell values separated by '|'
150
    def process_row(self, row: List, row_index: int):
1✔
151
        serialized_row_str = ""
1✔
152
        row_cell_values = [
1✔
153
            str(value) if isinstance(value, (int, float)) else value for value in row
154
        ]
155
        serialized_row_str += " | ".join([str(value) for value in row_cell_values])
1✔
156

157
        return f"row {row_index} : {serialized_row_str}"
1✔
158

159

160
class SerializeTableAsMarkdown(SerializeTable):
1✔
161
    """Markdown Table Serializer.
162

163
    Markdown table format is used in GitHub code primarily.
164
    Format:
165

166
    .. code-block:: text
167

168
        |col1|col2|col3|
169
        |---|---|---|
170
        |A|4|1|
171
        |I|2|1|
172
        ...
173

174
    """
175

176
    # main method that serializes a table.
177
    # table_content must be in the presribed input format.
178
    def serialize_table(self, table_content: Dict) -> str:
1✔
179
        # Extract headers and rows from the dictionary
180
        header = table_content.get("header", [])
1✔
181
        rows = table_content.get("rows", [])
1✔
182

183
        assert header and rows, "Incorrect input table format"
1✔
184

185
        # Process table header first
186
        serialized_tbl_str = self.process_header(header)
1✔
187

188
        # Process rows sequentially starting from row 1
189
        for i, row in enumerate(rows, start=1):
1✔
190
            serialized_tbl_str += self.process_row(row, row_index=i)
1✔
191

192
        # return serialized table as a string
193
        return serialized_tbl_str.strip()
1✔
194

195
    # serialize header into a string containing the list of column names
196
    def process_header(self, header: List):
1✔
197
        header_str = "|{}|\n".format("|".join(header))
1✔
198
        header_str += "|{}|\n".format("|".join(["---"] * len(header)))
1✔
199
        return header_str
1✔
200

201
    # serialize a table row into a string containing the list of cell values
202
    def process_row(self, row: List, row_index: int):
1✔
203
        row_str = ""
1✔
204
        row_str += "|{}|\n".format("|".join(str(cell) for cell in row))
1✔
205
        return row_str
1✔
206

207

208
class SerializeTableAsDFLoader(SerializeTable):
1✔
209
    """DFLoader Table Serializer.
210

211
    Pandas dataframe based code snippet format serializer.
212
    Format(Sample):
213

214
    .. code-block:: python
215

216
        pd.DataFrame({
217
            "name" : ["Alex", "Diana", "Donald"],
218
            "age" : [26, 34, 39]
219
        },
220
        index=[0,1,2])
221
    """
222

223
    # main method that serializes a table.
224
    # table_content must be in the presribed input format.
225
    def serialize_table(self, table_content: Dict) -> str:
1✔
226
        # Extract headers and rows from the dictionary
227
        header = table_content.get("header", [])
1✔
228
        rows = table_content.get("rows", [])
1✔
229

230
        assert header and rows, "Incorrect input table format"
1✔
231

232
        # Fix duplicate columns, ensuring the first occurrence has no suffix
233
        header = [
1✔
234
            f"{col}_{header[:i].count(col)}" if header[:i].count(col) > 0 else col
235
            for i, col in enumerate(header)
236
        ]
237

238
        # Create a pandas DataFrame
239
        df = pd.DataFrame(rows, columns=header)
1✔
240

241
        # Generate output string in the desired format
242
        data_dict = df.to_dict(orient="list")
1✔
243

244
        return (
1✔
245
            "pd.DataFrame({\n"
246
            + json.dumps(data_dict)[1:-1]
247
            + "},\nindex="
248
            + str(list(range(len(rows))))
249
            + ")"
250
        )
251

252

253
class SerializeTableAsJson(SerializeTable):
1✔
254
    """JSON Table Serializer.
255

256
    Json format based serializer.
257
    Format(Sample):
258

259
    .. code-block:: json
260

261
        {
262
            "0":{"name":"Alex","age":26},
263
            "1":{"name":"Diana","age":34},
264
            "2":{"name":"Donald","age":39}
265
        }
266
    """
267

268
    # main method that serializes a table.
269
    # table_content must be in the presribed input format.
270
    def serialize_table(self, table_content: Dict) -> str:
1✔
271
        # Extract headers and rows from the dictionary
272
        header = table_content.get("header", [])
1✔
273
        rows = table_content.get("rows", [])
1✔
274

275
        assert header and rows, "Incorrect input table format"
1✔
276

277
        # Generate output dictionary
278
        output_dict = {}
1✔
279
        for i, row in enumerate(rows):
1✔
280
            output_dict[i] = {header[j]: value for j, value in enumerate(row)}
1✔
281

282
        # Convert dictionary to JSON string
283
        return json.dumps(output_dict)
1✔
284

285

286
class SerializeTableAsHTML(SerializeTable):
1✔
287
    """HTML Table Serializer.
288

289
    HTML table format used for rendering tables in web pages.
290
    Format(Sample):
291

292
    .. code-block:: html
293

294
        <table>
295
            <thead>
296
                <tr><th>name</th><th>age</th><th>sex</th></tr>
297
            </thead>
298
            <tbody>
299
                <tr><td>Alice</td><td>26</td><td>F</td></tr>
300
                <tr><td>Raj</td><td>34</td><td>M</td></tr>
301
            </tbody>
302
        </table>
303
    """
304

305
    # main method that serializes a table.
306
    # table_content must be in the prescribed input format.
307
    def serialize_table(self, table_content: Dict) -> str:
1✔
308
        # Extract headers and rows from the dictionary
309
        header = table_content.get("header", [])
1✔
310
        rows = table_content.get("rows", [])
1✔
311

312
        assert header and rows, "Incorrect input table format"
1✔
313

314
        # Build the HTML table structure
315
        serialized_tbl_str = "<table>\n"
1✔
316
        serialized_tbl_str += self.process_header(header) + "\n"
1✔
317
        serialized_tbl_str += self.process_rows(rows) + "\n"
1✔
318
        serialized_tbl_str += "</table>"
1✔
319

320
        return serialized_tbl_str.strip()
1✔
321

322
    # serialize the header into an HTML <thead> section
323
    def process_header(self, header: List) -> str:
1✔
324
        header_html = "  <thead>\n    <tr>"
1✔
325
        for col in header:
1✔
326
            header_html += f"<th>{col}</th>"
1✔
327
        header_html += "</tr>\n  </thead>"
1✔
328
        return header_html
1✔
329

330
    # serialize the rows into an HTML <tbody> section
331
    def process_rows(self, rows: List[List]) -> str:
1✔
332
        rows_html = "  <tbody>"
1✔
333
        for row in rows:
1✔
334
            rows_html += "\n    <tr>"
1✔
335
            for cell in row:
1✔
336
                rows_html += f"<td>{cell}</td>"
1✔
337
            rows_html += "</tr>"
1✔
338
        rows_html += "\n  </tbody>"
1✔
339
        return rows_html
1✔
340

341

342
class SerializeTableAsConcatenation(SerializeTable):
1✔
343
    """Concat Serializer.
344

345
    Concat all table content to one string of header and rows.
346
    Format(Sample):
347
    name age Alex 26 Diana 34
348
    """
349

350
    def serialize_table(self, table_content: Dict) -> str:
1✔
351
        # Extract headers and rows from the dictionary
352
        header = table_content["header"]
×
353
        rows = table_content["rows"]
×
354

355
        assert header and rows, "Incorrect input table format"
×
356

357
        # Process table header first
358
        serialized_tbl_str = " ".join([str(i) for i in header])
×
359

360
        # Process rows sequentially starting from row 1
361
        for row in rows:
×
362
            serialized_tbl_str += " " + " ".join([str(i) for i in row])
×
363

364
        # return serialized table as a string
365
        return serialized_tbl_str.strip()
×
366

367

368
class SerializeTableAsImage(SerializeTable):
1✔
369
    _requirements_list = ["matplotlib", "pillow"]
1✔
370

371
    def serialize_table(self, table_content: Dict) -> str:
1✔
372
        raise NotImplementedError()
×
373

374
    def serialize(self, value: Table, instance: Dict[str, Any]) -> str:
1✔
375
        table_content = recursive_copy(value)
×
376
        if self.shuffle_columns:
×
377
            table_content = shuffle_columns(table=table_content, seed=self.seed)
×
378

379
        if self.shuffle_rows:
×
380
            table_content = shuffle_rows(table=table_content, seed=self.seed)
×
381

382
        import io
×
383

384
        import matplotlib.pyplot as plt
×
385
        import pandas as pd
×
386
        from PIL import Image
×
387

388
        # Extract headers and rows from the dictionary
389
        header = table_content.get("header", [])
×
390
        rows = table_content.get("rows", [])
×
391

392
        assert header and rows, "Incorrect input table format"
×
393

394
        # Fix duplicate columns, ensuring the first occurrence has no suffix
395
        header = [
×
396
            f"{col}_{header[:i].count(col)}" if header[:i].count(col) > 0 else col
397
            for i, col in enumerate(header)
398
        ]
399

400
        # Create a pandas DataFrame
401
        df = pd.DataFrame(rows, columns=header)
×
402

403
        # Fix duplicate columns, ensuring the first occurrence has no suffix
404
        df.columns = [
×
405
            f"{col}_{i}" if df.columns.duplicated()[i] else col
406
            for i, col in enumerate(df.columns)
407
        ]
408

409
        # Create a matplotlib table
410
        plt.rcParams["font.family"] = "Serif"
×
411
        fig, ax = plt.subplots(figsize=(len(header) * 1.5, len(rows) * 0.5))
×
412
        ax.axis("off")  # Turn off the axes
×
413

414
        table = pd.plotting.table(ax, df, loc="center", cellLoc="center")
×
415
        table.auto_set_column_width(col=range(len(df.columns)))
×
416
        table.scale(1.5, 1.5)
×
417

418
        # Save the plot to a BytesIO buffer
419
        buf = io.BytesIO()
×
420
        plt.savefig(buf, format="png", bbox_inches="tight", dpi=150)
×
421
        plt.close(fig)  # Close the figure to free up memory
×
422
        buf.seek(0)
×
423

424
        # Load the image from the buffer using PIL
425
        image = Image.open(buf)
×
426
        return ImageSerializer().serialize({"image": image, "format": "png"}, instance)
×
427

428

429
# truncate cell value to maximum allowed length
430
def truncate_cell(cell_value, max_len):
1✔
431
    if cell_value is None:
1✔
432
        return None
×
433

434
    if isinstance(cell_value, int) or isinstance(cell_value, float):
1✔
435
        return None
×
436

437
    if cell_value.strip() == "":
1✔
438
        return None
×
439

440
    if len(cell_value) > max_len:
1✔
441
        return cell_value[:max_len]
1✔
442

443
    return None
1✔
444

445

446
class TruncateTableCells(InstanceOperator):
1✔
447
    """Limit the maximum length of cell values in a table to reduce the overall length.
448

449
    Args:
450
        max_length (int) - maximum allowed length of cell values
451
        For tasks that produce a cell value as answer, truncating a cell value should be replicated
452
        with truncating the corresponding answer as well. This has been addressed in the implementation.
453

454
    """
455

456
    max_length: int = 15
1✔
457
    table: str = None
1✔
458
    text_output: Optional[str] = None
1✔
459

460
    def process(
1✔
461
        self, instance: Dict[str, Any], stream_name: Optional[str] = None
462
    ) -> Dict[str, Any]:
463
        table = dict_get(instance, self.table)
1✔
464

465
        answers = []
1✔
466
        if self.text_output is not None:
1✔
467
            answers = dict_get(instance, self.text_output)
×
468

469
        self.truncate_table(table_content=table, answers=answers)
1✔
470

471
        return instance
1✔
472

473
    # truncate table cells
474
    def truncate_table(self, table_content: Dict, answers: Optional[List]):
1✔
475
        cell_mapping = {}
1✔
476

477
        # One row at a time
478
        for row in table_content.get("rows", []):
1✔
479
            for i, cell in enumerate(row):
1✔
480
                truncated_cell = truncate_cell(cell, self.max_length)
1✔
481
                if truncated_cell is not None:
1✔
482
                    cell_mapping[cell] = truncated_cell
1✔
483
                    row[i] = truncated_cell
1✔
484

485
        # Update values in answer list to truncated values
486
        if answers is not None:
1✔
487
            for i, case in enumerate(answers):
1✔
488
                answers[i] = cell_mapping.get(case, case)
×
489

490

491
class TruncateTableRows(FieldOperator):
1✔
492
    """Limits table rows to specified limit by removing excess rows via random selection.
493

494
    Args:
495
        rows_to_keep (int): number of rows to keep.
496
    """
497

498
    rows_to_keep: int = 10
1✔
499

500
    def process_value(self, table: Any) -> Any:
1✔
501
        return self.truncate_table_rows(table_content=table)
1✔
502

503
    def truncate_table_rows(self, table_content: Dict):
1✔
504
        # Get rows from table
505
        rows = table_content.get("rows", [])
1✔
506

507
        num_rows = len(rows)
1✔
508

509
        # if number of rows are anyway lesser, return.
510
        if num_rows <= self.rows_to_keep:
1✔
511
            return table_content
×
512

513
        # calculate number of rows to delete, delete them
514
        rows_to_delete = num_rows - self.rows_to_keep
1✔
515

516
        # Randomly select rows to be deleted
517
        deleted_rows_indices = random.sample(range(len(rows)), rows_to_delete)
1✔
518

519
        remaining_rows = [
1✔
520
            row for i, row in enumerate(rows) if i not in deleted_rows_indices
521
        ]
522
        table_content["rows"] = remaining_rows
1✔
523

524
        return table_content
1✔
525

526

527
class GetNumOfTableCells(FieldOperator):
1✔
528
    """Get the number of cells in the given table."""
529

530
    def process_value(self, table: Any) -> Any:
1✔
531
        num_of_rows = len(table.get("rows"))
×
532
        num_of_cols = len(table.get("header"))
×
533
        return num_of_rows * num_of_cols
×
534

535

536
class SerializeTableRowAsText(InstanceOperator):
1✔
537
    """Serializes a table row as text.
538

539
    Args:
540
        fields (str) - list of fields to be included in serialization.
541
        to_field (str) - serialized text field name.
542
        max_cell_length (int) - limits cell length to be considered, optional.
543
    """
544

545
    fields: str
1✔
546
    to_field: str
1✔
547
    max_cell_length: Optional[int] = None
1✔
548

549
    def process(
1✔
550
        self, instance: Dict[str, Any], stream_name: Optional[str] = None
551
    ) -> Dict[str, Any]:
552
        linearized_str = ""
1✔
553
        for field in self.fields:
1✔
554
            value = dict_get(instance, field)
1✔
555
            if self.max_cell_length is not None:
1✔
556
                truncated_value = truncate_cell(value, self.max_cell_length)
1✔
557
                if truncated_value is not None:
1✔
558
                    value = truncated_value
×
559

560
            linearized_str = linearized_str + field + " is " + str(value) + ", "
1✔
561

562
        instance[self.to_field] = linearized_str
1✔
563
        return instance
1✔
564

565

566
class SerializeTableRowAsList(InstanceOperator):
1✔
567
    """Serializes a table row as list.
568

569
    Args:
570
        fields (str) - list of fields to be included in serialization.
571
        to_field (str) - serialized text field name.
572
        max_cell_length (int) - limits cell length to be considered, optional.
573
    """
574

575
    fields: str
1✔
576
    to_field: str
1✔
577
    max_cell_length: Optional[int] = None
1✔
578

579
    def process(
1✔
580
        self, instance: Dict[str, Any], stream_name: Optional[str] = None
581
    ) -> Dict[str, Any]:
582
        linearized_str = ""
1✔
583
        for field in self.fields:
1✔
584
            value = dict_get(instance, field)
1✔
585
            if self.max_cell_length is not None:
1✔
586
                truncated_value = truncate_cell(value, self.max_cell_length)
1✔
587
                if truncated_value is not None:
1✔
588
                    value = truncated_value
×
589

590
            linearized_str = linearized_str + field + ": " + str(value) + ", "
1✔
591

592
        instance[self.to_field] = linearized_str
1✔
593
        return instance
1✔
594

595

596
class SerializeTriples(FieldOperator):
1✔
597
    """Serializes triples into a flat sequence.
598

599
    Sample input in expected format:
600
    [[ "First Clearing", "LOCATION", "On NYS 52 1 Mi. Youngsville" ], [ "On NYS 52 1 Mi. Youngsville", "CITY_OR_TOWN", "Callicoon, New York"]]
601

602
    Sample output:
603
    First Clearing : LOCATION : On NYS 52 1 Mi. Youngsville | On NYS 52 1 Mi. Youngsville : CITY_OR_TOWN : Callicoon, New York
604

605
    """
606

607
    def process_value(self, tripleset: Any) -> Any:
1✔
608
        return self.serialize_triples(tripleset)
1✔
609

610
    def serialize_triples(self, tripleset) -> str:
1✔
611
        return " | ".join(
1✔
612
            f"{subj} : {rel.lower()} : {obj}" for subj, rel, obj in tripleset
613
        )
614

615

616
class SerializeKeyValPairs(FieldOperator):
1✔
617
    """Serializes key, value pairs into a flat sequence.
618

619
    Sample input in expected format: {"name": "Alex", "age": 31, "sex": "M"}
620
    Sample output: name is Alex, age is 31, sex is M
621
    """
622

623
    def process_value(self, kvpairs: Any) -> Any:
1✔
624
        return self.serialize_kvpairs(kvpairs)
1✔
625

626
    def serialize_kvpairs(self, kvpairs) -> str:
1✔
627
        serialized_str = ""
1✔
628
        for key, value in kvpairs.items():
1✔
629
            serialized_str += f"{key} is {value}, "
1✔
630

631
        # Remove the trailing comma and space then return
632
        return serialized_str[:-2]
1✔
633

634

635
class ListToKeyValPairs(InstanceOperator):
1✔
636
    """Maps list of keys and values into key:value pairs.
637

638
    Sample input in expected format: {"keys": ["name", "age", "sex"], "values": ["Alex", 31, "M"]}
639
    Sample output: {"name": "Alex", "age": 31, "sex": "M"}
640
    """
641

642
    fields: List[str]
1✔
643
    to_field: str
1✔
644

645
    def process(
1✔
646
        self, instance: Dict[str, Any], stream_name: Optional[str] = None
647
    ) -> Dict[str, Any]:
648
        keylist = dict_get(instance, self.fields[0])
1✔
649
        valuelist = dict_get(instance, self.fields[1])
1✔
650

651
        output_dict = {}
1✔
652
        for key, value in zip(keylist, valuelist):
1✔
653
            output_dict[key] = value
1✔
654

655
        instance[self.to_field] = output_dict
1✔
656

657
        return instance
1✔
658

659

660
class ConvertTableColNamesToSequential(FieldOperator):
1✔
661
    """Replaces actual table column names with static sequential names like col_0, col_1,...
662

663
    .. code-block:: text
664

665
        Sample input:
666
        {
667
            "header": ["name", "age"],
668
            "rows": [["Alex", 21], ["Donald", 34]]
669
        }
670

671
        Sample output:
672
        {
673
            "header": ["col_0", "col_1"],
674
            "rows": [["Alex", 21], ["Donald", 34]]
675
        }
676
    """
677

678
    def process_value(self, table: Any) -> Any:
1✔
679
        table_input = recursive_copy(table)
1✔
680
        return self.replace_header(table_content=table_input)
1✔
681

682
    # replaces header with sequential column names
683
    def replace_header(self, table_content: Dict) -> str:
1✔
684
        # Extract header from the dictionary
685
        header = table_content.get("header", [])
1✔
686

687
        assert header, "Input table missing header"
1✔
688

689
        new_header = ["col_" + str(i) for i in range(len(header))]
1✔
690
        table_content["header"] = new_header
1✔
691

692
        return table_content
1✔
693

694

695
class ShuffleTableRows(TypeDependentAugmentor):
1✔
696
    """Shuffles the input table rows randomly.
697

698
    .. code-block:: text
699

700
        Sample Input:
701
        {
702
            "header": ["name", "age"],
703
            "rows": [["Alex", 26], ["Raj", 34], ["Donald", 39]],
704
        }
705

706
        Sample Output:
707
        {
708
            "header": ["name", "age"],
709
            "rows": [["Donald", 39], ["Raj", 34], ["Alex", 26]],
710
        }
711
    """
712

713
    augmented_type = Table
1✔
714
    seed = 0
1✔
715

716
    def process_value(self, table: Any) -> Any:
1✔
717
        table_input = recursive_copy(table)
1✔
718
        return shuffle_rows(table_input, self.seed)
1✔
719

720

721
class ShuffleTableColumns(TypeDependentAugmentor):
1✔
722
    """Shuffles the table columns randomly.
723

724
    .. code-block:: text
725

726
        Sample Input:
727
            {
728
                "header": ["name", "age"],
729
                "rows": [["Alex", 26], ["Raj", 34], ["Donald", 39]],
730
            }
731

732
        Sample Output:
733
            {
734
                "header": ["age", "name"],
735
                "rows": [[26, "Alex"], [34, "Raj"], [39, "Donald"]],
736
            }
737
    """
738

739
    augmented_type = Table
1✔
740
    seed = 0
1✔
741

742
    def process_value(self, table: Any) -> Any:
1✔
743
        table_input = recursive_copy(table)
1✔
744
        return shuffle_columns(table_input, self.seed)
1✔
745

746

747
class LoadJson(FieldOperator):
1✔
748
    failure_value: Any = None
1✔
749
    allow_failure: bool = False
1✔
750

751
    def process_value(self, value: str) -> Any:
1✔
752
        if self.allow_failure:
1✔
753
            try:
1✔
754
                return json.loads(value)
1✔
755
            except json.JSONDecodeError:
1✔
756
                return self.failure_value
1✔
757
        else:
758
            return json.loads(value, strict=False)
1✔
759

760

761
class PythonCallProcessor(FieldOperator):
1✔
762
    def process_value(self, value: str) -> ToolCall:
1✔
763
        expr = ast.parse(value, mode="eval").body
×
764
        function = expr.func.id
×
765
        args = {}
×
766
        for kw in expr.keywords:
×
767
            args[kw.arg] = ast.literal_eval(kw.value)
×
768
        # Handle positional args, if any
769
        if expr.args:
×
770
            args["_args"] = [ast.literal_eval(arg) for arg in expr.args]
×
771
        return {"name": function, "arguments": args}
×
772

773

774
def extract_possible_json_str(text):
1✔
775
    """Extract potential JSON string from text by finding outermost braces/brackets."""
776
    # Find first opening delimiter
777
    start_positions = [pos for pos in [text.find("{"), text.find("[")] if pos != -1]
×
778
    start = min(start_positions) if start_positions else 0
×
779

780
    # Find last closing delimiter
781
    end_positions = [pos for pos in [text.rfind("}"), text.rfind("]")] if pos != -1]
×
782
    end = max(end_positions) if end_positions else len(text) - 1
×
783

784
    return text[start : end + 1]
×
785

786

787
class ToolCallPostProcessor(FieldOperator):
1✔
788
    failure_value: Any = None
1✔
789
    allow_failure: bool = False
1✔
790

791
    def process_value(self, value: str) -> ToolCall:
1✔
792
        value = extract_possible_json_str(
×
793
            value
794
        )  # clear tokens such as <tool_call> focusing on the call json itself
795
        if self.allow_failure:
×
796
            try:
×
797
                result = json.loads(value)
×
798
            except json.JSONDecodeError:
×
799
                return self.failure_value
×
800
        else:
801
            result = json.loads(value, strict=False)
×
802
        if isoftype(result, List[ToolCall]):
×
803
            if len(result) > 1:
×
804
                UnitxtWarning(f"More than one tool call returned from model: {result}")
×
805
                return self.failure_value
×
806
            if len(result) == 0:
×
807
                return self.failure_value
×
808
            return result[0]
×
809
        if not isoftype(result, ToolCall):
×
810
            return self.failure_value
×
811
        return result
×
812

813

814
class MultipleToolCallPostProcessor(FieldOperator):
1✔
815
    failure_value: Any = None
1✔
816
    allow_failure: bool = False
1✔
817

818
    def process_value(self, value: str) -> List[ToolCall]:
1✔
819
        if self.allow_failure:
×
820
            try:
×
821
                result = json.loads(value)
×
822
            except json.JSONDecodeError:
×
823
                return self.failure_value
×
824
        else:
825
            result = json.loads(value, strict=False)
×
826
        if isoftype(result, List[ToolCall]):
×
827
            return result
×
828
        if not isoftype(result, ToolCall):
×
829
            return self.failure_value
×
830
        return [result]
×
831

832

833
class DumpJson(FieldOperator):
1✔
834
    def process_value(self, value: str) -> str:
1✔
835
        return json.dumps(value)
1✔
836

837

838
class MapHTMLTableToJSON(FieldOperator):
1✔
839
    """Converts HTML table format to the basic one (JSON).
840

841
    JSON format:
842

843
    .. code-block:: json
844

845
        {
846
            "header": ["col1", "col2"],
847
            "rows": [["row11", "row12"], ["row21", "row22"], ["row31", "row32"]]
848
        }
849
    """
850

851
    _requirements_list = ["bs4"]
1✔
852

853
    def process_value(self, table: Any) -> Any:
1✔
854
        return self.convert_to_json(table_content=table)
1✔
855

856
    def convert_to_json(self, table_content: str) -> Dict:
1✔
857
        from bs4 import BeautifulSoup
1✔
858

859
        soup = BeautifulSoup(table_content, "html.parser")
1✔
860

861
        # Extract header
862
        header = []
1✔
863
        header_cells = soup.find("thead").find_all("th")
1✔
864
        for cell in header_cells:
1✔
865
            header.append(cell.get_text())
1✔
866

867
        # Extract rows
868
        rows = []
1✔
869
        for row in soup.find("tbody").find_all("tr"):
1✔
870
            row_data = []
1✔
871
            for cell in row.find_all("td"):
1✔
872
                row_data.append(cell.get_text())
1✔
873
            rows.append(row_data)
1✔
874

875
        # return dictionary
876

877
        return {"header": header, "rows": rows}
1✔
878

879

880
class MapTableListsToStdTableJSON(FieldOperator):
1✔
881
    """Converts lists table format to the basic one (JSON).
882

883
    JSON format:
884

885
    .. code-block:: json
886

887
        {
888
            "header": ["col1", "col2"],
889
            "rows": [["row11", "row12"], ["row21", "row22"], ["row31", "row32"]]
890
        }
891
    """
892

893
    def process_value(self, table: Any) -> Any:
1✔
894
        return self.map_tablelists_to_stdtablejson_util(table_content=table)
×
895

896
    def map_tablelists_to_stdtablejson_util(self, table_content: str) -> Dict:
1✔
897
        return {"header": table_content[0], "rows": table_content[1:]}
×
898

899

900
class ConstructTableFromRowsCols(InstanceOperator):
1✔
901
    """Maps column and row field into single table field encompassing both header and rows.
902

903
    field[0] = header string as List
904
    field[1] = rows string as List[List]
905
    field[2] = table caption string(optional)
906
    """
907

908
    fields: List[str]
1✔
909
    to_field: str
1✔
910

911
    def process(
1✔
912
        self, instance: Dict[str, Any], stream_name: Optional[str] = None
913
    ) -> Dict[str, Any]:
914
        header = dict_get(instance, self.fields[0])
×
915
        rows = dict_get(instance, self.fields[1])
×
916

917
        if len(self.fields) >= 3:
×
918
            caption = instance[self.fields[2]]
×
919
        else:
920
            caption = None
×
921

922
        import ast
×
923

924
        header_processed = ast.literal_eval(header)
×
925
        rows_processed = ast.literal_eval(rows)
×
926

927
        output_dict = {"header": header_processed, "rows": rows_processed}
×
928

929
        if caption is not None:
×
930
            output_dict["caption"] = caption
×
931

932
        instance[self.to_field] = output_dict
×
933

934
        return instance
×
935

936

937
class TransposeTable(TypeDependentAugmentor):
1✔
938
    """Transpose a table.
939

940
    .. code-block:: text
941

942
        Sample Input:
943
            {
944
                "header": ["name", "age", "sex"],
945
                "rows": [["Alice", 26, "F"], ["Raj", 34, "M"], ["Donald", 39, "M"]],
946
            }
947

948
        Sample Output:
949
            {
950
                "header": [" ", "0", "1", "2"],
951
                "rows": [["name", "Alice", "Raj", "Donald"], ["age", 26, 34, 39], ["sex", "F", "M", "M"]],
952
            }
953

954
    """
955

956
    augmented_type = Table
1✔
957

958
    def process_value(self, table: Any) -> Any:
1✔
959
        return self.transpose_table(table)
1✔
960

961
    def transpose_table(self, table: Dict) -> Dict:
1✔
962
        # Extract the header and rows from the table object
963
        header = table["header"]
1✔
964
        rows = table["rows"]
1✔
965

966
        # Transpose the table by converting rows as columns and vice versa
967
        transposed_header = [" "] + [str(i) for i in range(len(rows))]
1✔
968
        transposed_rows = [
1✔
969
            [header[i]] + [row[i] for row in rows] for i in range(len(header))
970
        ]
971

972
        return {"header": transposed_header, "rows": transposed_rows}
1✔
973

974

975
class DuplicateTableRows(TypeDependentAugmentor):
1✔
976
    """Duplicates specific rows of a table for the given number of times.
977

978
    Args:
979
        row_indices (List[int]): rows to be duplicated
980

981
        times(int): each row to be duplicated is to show that many times
982
    """
983

984
    augmented_type = Table
1✔
985

986
    row_indices: List[int] = []
1✔
987
    times: int = 1
1✔
988

989
    def process_value(self, table: Any) -> Any:
1✔
990
        # Extract the header and rows from the table
991
        header = table["header"]
1✔
992
        rows = table["rows"]
1✔
993

994
        # Duplicate only the specified rows
995
        duplicated_rows = []
1✔
996
        for i, row in enumerate(rows):
1✔
997
            if i in self.row_indices:
1✔
998
                duplicated_rows.extend(
1✔
999
                    [row] * self.times
1000
                )  # Duplicate the selected rows
1001
            else:
1002
                duplicated_rows.append(row)  # Leave other rows unchanged
1✔
1003

1004
        # Return the new table with selectively duplicated rows
1005
        return {"header": header, "rows": duplicated_rows}
1✔
1006

1007

1008
class DuplicateTableColumns(TypeDependentAugmentor):
1✔
1009
    """Duplicates specific columns of a table for the given number of times.
1010

1011
    Args:
1012
        column_indices (List[int]): columns to be duplicated
1013

1014
        times(int): each column to be duplicated is to show that many times
1015
    """
1016

1017
    augmented_type = Table
1✔
1018

1019
    column_indices: List[int] = []
1✔
1020
    times: int = 1
1✔
1021

1022
    def process_value(self, table: Any) -> Any:
1✔
1023
        # Extract the header and rows from the table
1024
        header = table["header"]
1✔
1025
        rows = table["rows"]
1✔
1026

1027
        # Duplicate the specified columns in the header
1028
        duplicated_header = []
1✔
1029
        for i, col in enumerate(header):
1✔
1030
            if i in self.column_indices:
1✔
1031
                duplicated_header.extend([col] * self.times)
1✔
1032
            else:
1033
                duplicated_header.append(col)
1✔
1034

1035
        # Duplicate the specified columns in each row
1036
        duplicated_rows = []
1✔
1037
        for row in rows:
1✔
1038
            new_row = []
1✔
1039
            for i, value in enumerate(row):
1✔
1040
                if i in self.column_indices:
1✔
1041
                    new_row.extend([value] * self.times)
1✔
1042
                else:
1043
                    new_row.append(value)
1✔
1044
            duplicated_rows.append(new_row)
1✔
1045

1046
        # Return the new table with selectively duplicated columns
1047
        return {"header": duplicated_header, "rows": duplicated_rows}
1✔
1048

1049

1050
class InsertEmptyTableRows(TypeDependentAugmentor):
1✔
1051
    """Inserts empty rows in a table randomly for the given number of times.
1052

1053
    Args:
1054
        times(int) - how many times to insert
1055
    """
1056

1057
    augmented_type = Table
1✔
1058

1059
    times: int = 0
1✔
1060

1061
    def process_value(self, table: Any) -> Any:
1✔
1062
        # Extract the header and rows from the table
1063
        header = table["header"]
1✔
1064
        rows = table["rows"]
1✔
1065

1066
        # Insert empty rows at random positions
1067
        for _ in range(self.times):
1✔
1068
            empty_row = [""] * len(
1✔
1069
                header
1070
            )  # Create an empty row with the same number of columns
1071
            insert_pos = random.randint(
1✔
1072
                0, len(rows)
1073
            )  # Get a random position to insert the empty row created
1074
            rows.insert(insert_pos, empty_row)
1✔
1075

1076
        # Return the modified table
1077
        return {"header": header, "rows": rows}
1✔
1078

1079

1080
class MaskColumnsNames(TypeDependentAugmentor):
1✔
1081
    """Mask the names of tables columns with dummies "Col1", "Col2" etc."""
1082

1083
    augmented_type = Table
1✔
1084

1085
    def process_value(self, table: Any) -> Any:
1✔
1086
        masked_header = ["Col" + str(ind + 1) for ind in range(len(table["header"]))]
×
1087

1088
        return {"header": masked_header, "rows": table["rows"]}
×
1089

1090

1091
class ShuffleColumnsNames(TypeDependentAugmentor):
1✔
1092
    """Shuffle table columns names to be displayed in random order."""
1093

1094
    augmented_type = Table
1✔
1095

1096
    def process_value(self, table: Any) -> Any:
1✔
1097
        shuffled_header = table["header"]
×
1098
        random.shuffle(shuffled_header)
×
1099

1100
        return {"header": shuffled_header, "rows": table["rows"]}
×
1101

1102

1103
class JsonStrToDict(FieldOperator):
1✔
1104
    """Convert a Json string of representing key value as dictionary.
1105

1106
    Ensure keys and values are strings, and there are no None values.
1107

1108
    """
1109

1110
    def process_value(self, text: str) -> List[Tuple[str, str]]:
1✔
1111
        try:
1✔
1112
            dict_value = json.loads(text)
1✔
1113
        except Exception as e:
1114
            UnitxtWarning(
1115
                f"Unable to convert input text to json format in JsonStrToDict due to {e}. Text: {text}"
1116
            )
1117
            dict_value = {}
1118
        if not isoftype(dict_value, Dict[str, Any]):
1✔
1119
            UnitxtWarning(
1✔
1120
                f"Unable to convert input text to dictionary in JsonStrToDict. Text: {text}"
1121
            )
1122
            dict_value = {}
1✔
1123
        return {str(k): str(v) for k, v in dict_value.items() if v is not None}
1✔
1124

1125

1126
class ParseCSV(FieldOperator):
1✔
1127
    r"""Parse CSV/TSV text content into table format.
1128

1129
    This operator converts CSV or TSV text content into the standard table format
1130
    used by Unitxt with header and rows fields.
1131

1132
    Args:
1133
        separator (str): Field separator character. Defaults to ','.
1134
        has_header (bool): Whether the first row contains column headers. Defaults to True.
1135
        skip_header (bool): Whether to skip the first row entirely. Defaults to False.
1136

1137
    Example:
1138
        Parsing CSV content
1139

1140
        .. code-block:: python
1141

1142
            ParseCSV(field="csv_content", to_field="table", separator=",")
1143

1144
        Parsing TSV content
1145

1146
        .. code-block:: python
1147

1148
            ParseCSV(field="tsv_content", to_field="table", separator="\t")
1149
    """
1150

1151
    separator: str = ","
1✔
1152
    has_header: bool = True
1✔
1153
    skip_header: bool = False
1✔
1154
    dtype: Optional[Literal["str"]] = None
1✔
1155
    strip_cells: bool = False
1✔
1156

1157
    def process_value(self, value: str) -> Dict[str, Any]:
1✔
1158
        csv_reader = csv.reader(
1✔
1159
            io.StringIO(value), delimiter=self.separator, quotechar='"'
1160
        )
1161
        rows = []
1✔
1162
        header = []
1✔
1163
        for idx, row in enumerate(csv_reader):
1✔
1164
            if idx == 0 and self.has_header:
1✔
1165
                header = row
1✔
1166
                if self.skip_header:
1✔
1167
                    continue
1✔
1168
            else:
1169
                rows.append(row)
1✔
1170

1171
        if not self.has_header or self.skip_header:
1✔
1172
            header = [f"col_{i}" for i in range(len(rows[0]))]
1✔
1173

1174
        if self.strip_cells:
1✔
1175

1176
            def clean_cell(x):
×
1177
                if isinstance(x, str):
×
1178
                    return x.replace("\n", " ").strip()
×
1179
                return x
×
1180

1181
            rows = [[clean_cell(cell) for cell in row] for row in rows]
×
1182
            header = [clean_cell(h) for h in header]
×
1183

1184
        return {
1✔
1185
            "header": header,
1186
            "rows": rows,
1187
        }
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc