• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

georgia-tech-db / eva / #758

04 Sep 2023 08:37PM UTC coverage: 0.0% (-78.3%) from 78.333%
#758

push

circle-ci

hershd23
Increased underline length in at line 75 in text_summarization.rst
	modified:   docs/source/benchmarks/text_summarization.rst

0 of 11303 relevant lines covered (0.0%)

0.0 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/evadb/models/storage/batch.py
1
# coding=utf-8
2
# Copyright 2018-2023 EvaDB
3
#
4
# Licensed under the Apache License, Version 2.0 (the "License");
5
# you may not use this file except in compliance with the License.
6
# You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
from typing import Callable, Iterable, List, TypeVar, Union
×
16

17
import numpy as np
×
18
import pandas as pd
×
19

20
from evadb.expression.abstract_expression import ExpressionType
×
21
from evadb.parser.alias import Alias
×
22
from evadb.utils.generic_utils import PickleSerializer
×
23
from evadb.utils.logging_manager import logger
×
24

25
Batch = TypeVar("Batch")
×
26

27

28
class Batch:
×
29
    """
30
    Data model used for storing a batch of frames.
31
    Internally stored as a pandas DataFrame with columns
32
    "id" and "data".
33
    id: integer index of frame
34
    data: frame as np.array
35

36
    Arguments:
37
        frames (DataFrame): pandas Dataframe holding frames data
38
    """
39

40
    def __init__(self, frames=None):
×
41
        self._frames = pd.DataFrame() if frames is None else frames
×
42
        if not isinstance(self._frames, pd.DataFrame):
×
43
            raise ValueError(
44
                "Batch constructor not properly called.\n" "Expected pandas.DataFrame"
45
            )
46

47
    @property
×
48
    def frames(self) -> pd.DataFrame:
×
49
        return self._frames
×
50

51
    def __len__(self):
×
52
        return len(self._frames)
×
53

54
    @property
×
55
    def columns(self):
×
56
        return list(self._frames.columns)
×
57

58
    def column_as_numpy_array(self, column_name: str) -> np.ndarray:
×
59
        """Return a column as numpy array
60

61
        Args:
62
            column_name (str): the name of the required column
63

64
        Returns:
65
            numpy.ndarray: the column data as a numpy array
66
        """
67
        return self._frames[column_name].to_numpy()
×
68

69
    def serialize(self):
×
70
        obj = {"frames": self._frames, "batch_size": len(self)}
×
71
        return PickleSerializer.serialize(obj)
×
72

73
    @classmethod
×
74
    def deserialize(cls, data):
×
75
        obj = PickleSerializer.deserialize(data)
×
76
        return cls(frames=obj["frames"])
×
77

78
    @classmethod
×
79
    def from_eq(cls, batch1: Batch, batch2: Batch) -> Batch:
×
80
        return Batch(pd.DataFrame(batch1.to_numpy() == batch2.to_numpy()))
×
81

82
    @classmethod
×
83
    def from_greater(cls, batch1: Batch, batch2: Batch) -> Batch:
×
84
        return Batch(pd.DataFrame(batch1.to_numpy() > batch2.to_numpy()))
×
85

86
    @classmethod
×
87
    def from_lesser(cls, batch1: Batch, batch2: Batch) -> Batch:
×
88
        return Batch(pd.DataFrame(batch1.to_numpy() < batch2.to_numpy()))
×
89

90
    @classmethod
×
91
    def from_greater_eq(cls, batch1: Batch, batch2: Batch) -> Batch:
×
92
        return Batch(pd.DataFrame(batch1.to_numpy() >= batch2.to_numpy()))
×
93

94
    @classmethod
×
95
    def from_lesser_eq(cls, batch1: Batch, batch2: Batch) -> Batch:
×
96
        return Batch(pd.DataFrame(batch1.to_numpy() <= batch2.to_numpy()))
×
97

98
    @classmethod
×
99
    def from_not_eq(cls, batch1: Batch, batch2: Batch) -> Batch:
×
100
        return Batch(pd.DataFrame(batch1.to_numpy() != batch2.to_numpy()))
×
101

102
    @classmethod
×
103
    def compare_contains(cls, batch1: Batch, batch2: Batch) -> None:
×
104
        return cls(
×
105
            pd.DataFrame(
106
                [all(x in p for x in q) for p, q in zip(left, right)]
107
                for left, right in zip(batch1.to_numpy(), batch2.to_numpy())
108
            )
109
        )
110

111
    @classmethod
×
112
    def compare_is_contained(cls, batch1: Batch, batch2: Batch) -> None:
×
113
        return cls(
×
114
            pd.DataFrame(
115
                [all(x in q for x in p) for p, q in zip(left, right)]
116
                for left, right in zip(batch1.to_numpy(), batch2.to_numpy())
117
            )
118
        )
119

120
    @classmethod
×
121
    def compare_like(cls, batch1: Batch, batch2: Batch) -> None:
×
122
        col = batch1._frames.iloc[:, 0]
×
123
        regex = batch2._frames.iloc[:, 0][0]
×
124
        return cls(pd.DataFrame(col.astype("str").str.match(pat=regex)))
×
125

126
    def __str__(self) -> str:
×
127
        with pd.option_context(
×
128
            "display.pprint_nest_depth", 1, "display.max_colwidth", 100
129
        ):
130
            return f"{self._frames}"
×
131

132
    def __eq__(self, other: Batch):
×
133
        # this function does not work if a column is a nested numpy arrays
134
        # (eg, bboxes from yolo).
135
        return self._frames[sorted(self.columns)].equals(
×
136
            other.frames[sorted(other.columns)]
137
        )
138

139
    def __getitem__(self, indices) -> Batch:
×
140
        """
141
        Returns a batch with the desired frames
142

143
        Arguments:
144
            indices (list, slice or mask): list must be
145
            a list of indices; mask is boolean array-like
146
            (i.e. list, NumPy array, DataFrame, etc.)
147
            of appropriate size with True for desired frames.
148
        """
149
        if isinstance(indices, list):
×
150
            return self._get_frames_from_indices(indices)
×
151
        elif isinstance(indices, slice):
×
152
            start = indices.start if indices.start else 0
×
153
            end = indices.stop if indices.stop else len(self.frames)
×
154
            if end < 0:
×
155
                end = len(self._frames) + end
×
156
            step = indices.step if indices.step else 1
×
157
            return self._get_frames_from_indices(range(start, end, step))
×
158
        elif isinstance(indices, int):
×
159
            return self._get_frames_from_indices([indices])
×
160
        else:
161
            raise TypeError("Invalid argument type: {}".format(type(indices)))
162

163
    def _get_frames_from_indices(self, required_frame_ids):
×
164
        new_frames = self._frames.iloc[required_frame_ids, :]
×
165
        new_batch = Batch(new_frames)
×
166
        return new_batch
×
167

168
    def apply_function_expression(self, expr: Callable) -> Batch:
×
169
        """
170
        Execute function expression on frames.
171
        """
172
        self.drop_column_alias()
×
173
        return Batch(expr(self._frames))
×
174

175
    def iterrows(self):
×
176
        return self._frames.iterrows()
×
177

178
    def sort(self, by=None) -> None:
×
179
        """
180
        in_place sort
181
        """
182
        if self.empty():
×
183
            return
×
184
        if by is None:
×
185
            by = self.columns[0]
×
186
        self._frames.sort_values(by=by, ignore_index=True, inplace=True)
×
187

188
    def sort_orderby(self, by, sort_type=None) -> None:
×
189
        """
190
        in_place sort for order_by
191

192
        Args:
193
            by: list of column names
194
            sort_type: list of True/False if ASC for each column name in 'by'
195
                i.e [True, False] means [ASC, DESC]
196
        """
197

198
        if sort_type is None:
×
199
            sort_type = [True]
×
200

201
        assert by is not None
×
202
        for column in by:
×
203
            assert (
×
204
                column in self._frames.columns
205
            ), "Can not orderby non-projected column: {}".format(column)
206

207
        self._frames.sort_values(
×
208
            by, ascending=sort_type, ignore_index=True, inplace=True
209
        )
210

211
    def invert(self) -> None:
×
212
        self._frames = ~self._frames
×
213

214
    def all_true(self) -> bool:
×
215
        return self._frames.all().bool()
×
216

217
    def all_false(self) -> bool:
×
218
        inverted = ~self._frames
×
219
        return inverted.all().bool()
×
220

221
    def create_mask(self) -> List:
×
222
        """
223
        Return list of indices of first row.
224
        """
225
        return self._frames[self._frames[0]].index.tolist()
×
226

227
    def create_inverted_mask(self) -> List:
×
228
        return self._frames[~self._frames[0]].index.tolist()
×
229

230
    def update_indices(self, indices: List, other: Batch):
×
231
        self._frames.iloc[indices] = other._frames
×
232
        self._frames = pd.DataFrame(self._frames)
×
233

234
    def file_paths(self) -> Iterable:
×
235
        yield from self._frames["file_path"]
×
236

237
    def project(self, cols: None) -> Batch:
×
238
        """
239
        Takes as input the column list, returns the projection.
240
        We do a copy for now.
241
        """
242
        cols = cols or []
×
243
        verified_cols = [c for c in cols if c in self._frames]
×
244
        unknown_cols = list(set(cols) - set(verified_cols))
×
245
        assert len(unknown_cols) == 0, unknown_cols
×
246
        return Batch(self._frames[verified_cols])
×
247

248
    @classmethod
×
249
    def merge_column_wise(cls, batches: List[Batch], auto_renaming=False) -> Batch:
×
250
        """
251
        Merge list of batch frames column_wise and return a new batch frame
252
        Arguments:
253
            batches: List[Batch]: list of batch objects to be merged
254
            auto_renaming: if true rename column names if required
255

256
        Returns:
257
            Batch: Merged batch object
258
        """
259
        if not len(batches):
×
260
            return Batch()
×
261

262
        frames = [batch.frames for batch in batches]
×
263

264
        # Check merging matched indices
265
        frames_index = [list(frame.index) for frame in frames]
×
266
        for i, frame_index in enumerate(frames_index):
×
267
            assert (
×
268
                frame_index == frames_index[i - 1]
269
            ), "Merging of DataFrames with unmatched indices can cause undefined behavior"
270

271
        new_frames = pd.concat(frames, axis=1, copy=False, ignore_index=False).fillna(
×
272
            method="ffill"
273
        )
274
        if new_frames.columns.duplicated().any():
×
275
            logger.debug("Duplicated column name detected {}".format(new_frames))
×
276
        return Batch(new_frames)
×
277

278
    def __add__(self, other: Batch) -> Batch:
×
279
        """
280
        Adds two batch frames and return a new batch frame
281
        Arguments:
282
            other (Batch): other framebatch to add
283

284
        Returns:
285
            Batch
286
        """
287
        if not isinstance(other, Batch):
×
288
            raise TypeError("Input should be of type Batch")
289

290
        # Appending a empty dataframe with column name leads to NaN row.
291
        if self.empty():
×
292
            return other
×
293
        if other.empty():
×
294
            return self
×
295

296
        return Batch.concat([self, other], copy=False)
×
297

298
    @classmethod
×
299
    def concat(cls, batch_list: Iterable[Batch], copy=True) -> Batch:
×
300
        """Concat a list of batches.
301
        Notice: only frames are considered.
302
        """
303

304
        # pd.concat will convert generator into list, so it does not hurt
305
        # if we convert ourselves.
306
        frame_list = list([batch.frames for batch in batch_list])
×
307
        if len(frame_list) == 0:
×
308
            return Batch()
×
309
        frame = pd.concat(frame_list, ignore_index=True, copy=copy)
×
310

311
        return Batch(frame)
×
312

313
    @classmethod
×
314
    def stack(cls, batch: Batch, copy=True) -> Batch:
×
315
        """Stack a given batch along the 0th dimension.
316
        Notice: input assumed to contain only one column with video frames
317

318
        Returns:
319
            Batch (always of length 1)
320
        """
321
        if len(batch.columns) > 1:
×
322
            raise ValueError("Stack can only be called on single-column batches")
323
        frame_data_col = batch.columns[0]
×
324
        data_to_stack = batch.frames[frame_data_col].values.tolist()
×
325

326
        if isinstance(data_to_stack[0], np.ndarray) and len(data_to_stack[0].shape) > 1:
×
327
            # if data_to_stack has more than 1 axis, we add a new axis
328
            # [(3, 224, 224) * 10] -> (10, 3, 224, 224)
329
            stacked_array = np.array(batch.frames[frame_data_col].values.tolist())
×
330
        else:
331
            # we concatenate along the zeroth axis
332
            # this makes sense for audio and text
333
            stacked_array = np.hstack(batch.frames[frame_data_col].values)
×
334

335
        stacked_frame = pd.DataFrame([{frame_data_col: stacked_array}])
×
336
        return Batch(stacked_frame)
×
337

338
    @classmethod
×
339
    def join(cls, first: Batch, second: Batch, how="inner") -> Batch:
×
340
        return cls(
×
341
            first._frames.merge(
342
                second._frames, left_index=True, right_index=True, how=how
343
            )
344
        )
345

346
    @classmethod
×
347
    def combine_batches(
×
348
        cls, first: Batch, second: Batch, expression: ExpressionType
349
    ) -> Batch:
350
        """
351
        Creates Batch by combining two batches using some arithmetic expression.
352
        """
353
        if expression == ExpressionType.ARITHMETIC_ADD:
×
354
            return Batch(pd.DataFrame(first._frames + second._frames))
×
355
        elif expression == ExpressionType.ARITHMETIC_SUBTRACT:
×
356
            return Batch(pd.DataFrame(first._frames - second._frames))
×
357
        elif expression == ExpressionType.ARITHMETIC_MULTIPLY:
×
358
            return Batch(pd.DataFrame(first._frames * second._frames))
×
359
        elif expression == ExpressionType.ARITHMETIC_DIVIDE:
×
360
            return Batch(pd.DataFrame(first._frames / second._frames))
×
361

362
    def reassign_indices_to_hash(self, indices) -> None:
×
363
        """
364
        Hash indices and replace the indices with those hash values.
365
        """
366
        self._frames.index = self._frames[indices].apply(
×
367
            lambda x: hash(tuple(x)), axis=1
368
        )
369

370
    def aggregate(self, method: str) -> None:
×
371
        """
372
        Aggregate batch based on method.
373
        Methods can be sum, count, min, max, mean
374

375
        Arguments:
376
            method: string with one of the five above options
377
        """
378
        self._frames = self._frames.agg([method])
×
379

380
    def empty(self):
×
381
        """Checks if the batch is empty
382
        Returns:
383
            True if the batch_size == 0
384
        """
385
        return len(self) == 0
×
386

387
    def unnest(self, cols: List[str] = None) -> None:
×
388
        """
389
        Unnest columns and drop columns with no data
390
        """
391
        if cols is None:
×
392
            cols = list(self.columns)
×
393
        self._frames = self._frames.explode(cols)
×
394
        self._frames.dropna(inplace=True)
×
395

396
    def reverse(self) -> None:
×
397
        """Reverses dataframe"""
398
        self._frames = self._frames[::-1]
×
399
        self._frames.reset_index(drop=True, inplace=True)
×
400

401
    def drop_zero(self, outcomes: Batch) -> None:
×
402
        """Drop all columns with corresponding outcomes containing zero."""
403
        self._frames = self._frames[(outcomes._frames > 0).to_numpy()]
×
404

405
    def reset_index(self):
×
406
        """Resets the index of the data frame in the batch"""
407
        self._frames.reset_index(drop=True, inplace=True)
×
408

409
    def modify_column_alias(self, alias: Union[Alias, str]) -> None:
×
410
        # a, b, c -> table1.a, table1.b, table1.c
411
        # t1.a -> t2.a
412
        if isinstance(alias, str):
×
413
            alias = Alias(alias)
×
414
        new_col_names = []
×
415
        if len(alias.col_names):
×
416
            if len(self.columns) != len(alias.col_names):
×
417
                err_msg = (
×
418
                    f"Expected {len(alias.col_names)} columns {alias.col_names},"
419
                    f"got {len(self.columns)} columns {self.columns}."
420
                )
421
                raise RuntimeError(err_msg)
422
            new_col_names = [
×
423
                "{}.{}".format(alias.alias_name, col_name)
424
                for col_name in alias.col_names
425
            ]
426
        else:
427
            for col_name in self.columns:
×
428
                if "." in str(col_name):
×
429
                    new_col_names.append(
×
430
                        "{}.{}".format(alias.alias_name, str(col_name).split(".")[1])
431
                    )
432
                else:
433
                    new_col_names.append("{}.{}".format(alias.alias_name, col_name))
×
434

435
        self._frames.columns = new_col_names
×
436

437
    def drop_column_alias(self) -> None:
×
438
        # table1.a, table1.b, table1.c -> a, b, c
439
        new_col_names = []
×
440
        for col_name in self.columns:
×
441
            if isinstance(col_name, str) and "." in col_name:
×
442
                new_col_names.append(col_name.split(".")[1])
×
443
            else:
444
                new_col_names.append(col_name)
×
445

446
        self._frames.columns = new_col_names
×
447

448
    def to_numpy(self):
×
449
        return self._frames.to_numpy()
×
450

451
    def rename(self, columns) -> None:
×
452
        "Rename column names"
453
        self._frames.rename(columns=columns, inplace=True)
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc