• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

georgia-tech-db / eva / 1faaf654-86a1-4547-9fa2-be3ff96452d7

pending completion
1faaf654-86a1-4547-9fa2-be3ff96452d7

Pull #587

circle-ci

jarulraj
checkpoint
Pull Request #587: feat: Delete and Insert operators for structured data

219 of 219 new or added lines in 17 files covered. (100.0%)

8487 of 9096 relevant lines covered (93.3%)

0.93 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.02
/eva/executor/delete_executor.py
1
# coding=utf-8
2
# Copyright 2018-2022 EVA
3
#
4
# Licensed under the Apache License, Version 2.0 (the "License");
5
# you may not use this file except in compliance with the License.
6
# You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
from typing import Generator, Iterator
1✔
16

17
import pandas as pd
1✔
18

19
from eva.catalog.catalog_manager import CatalogManager
1✔
20
from eva.catalog.catalog_type import TableType
1✔
21
from eva.configuration.configuration_manager import ConfigurationManager
1✔
22
from eva.executor.abstract_executor import AbstractExecutor
1✔
23
from eva.executor.executor_utils import ExecutorError, apply_predicate
1✔
24
from eva.models.storage.batch import Batch
1✔
25
from eva.plan_nodes.project_plan import ProjectPlan
1✔
26
from eva.storage.storage_engine import StorageEngine
1✔
27
from eva.utils.logging_manager import logger
1✔
28

29

30
class DeleteExecutor(AbstractExecutor):
1✔
31
    """ """
32

33
    def __init__(self, node: ProjectPlan):
1✔
34
        super().__init__(node)
1✔
35
        self.predicate = node.where_clause
1✔
36
        self.catalog = CatalogManager()
1✔
37

38
    def validate(self):
1✔
39
        pass
×
40

41
    def exec(self, **kwargs) -> Iterator[Batch]:
1✔
42
        try:
1✔
43
            # using apply_predicate to get rows
44
            config_batch_mem_size = ConfigurationManager().get_value(
1✔
45
                "executor", "batch_mem_size"
46
            )
47
            batch_mem = 30000000
1✔
48
            if config_batch_mem_size:
1✔
49
                batch_mem = config_batch_mem_size
1✔
50
            table_catalog = self.node.table_ref.table.table_obj
1✔
51
            storage_engine = StorageEngine.factory(table_catalog)
1✔
52

53
            del_batch = Batch()
1✔
54

55
            if table_catalog.table_type == TableType.VIDEO_DATA:
1✔
56
                raise NotImplementedError("DELETE only implemented for structured data")
×
57
            elif table_catalog.table_type == TableType.IMAGE_DATA:
1✔
58
                raise NotImplementedError("DELETE only implemented for structured data")
×
59
            elif table_catalog.table_type == TableType.STRUCTURED_DATA:
1✔
60
                del_batch = storage_engine.read(table_catalog, batch_mem)
1✔
61
                del_batch = list(del_batch)[0]
1✔
62

63
            # Added because of inconsistency in col_alias in Structured data Batch project function
64
            original_column_names = list(del_batch.frames.columns)
1✔
65
            column_names = [
1✔
66
                f"{table_catalog.name.lower()}.{name}"
67
                for name in original_column_names
68
                if not name == "_row_id"
69
            ]
70
            column_names.insert(0, "_row_id")
1✔
71
            del_batch.frames.columns = column_names
1✔
72
            del_batch = apply_predicate(del_batch, self.predicate)
1✔
73

74
            # All the batches that need to be deleted
75

76
            if table_catalog.table_type == TableType.VIDEO_DATA:
1✔
77
                storage_engine.delete(table_catalog, del_batch)
×
78
            elif table_catalog.table_type == TableType.IMAGE_DATA:
1✔
79
                storage_engine.delete(table_catalog, del_batch)
×
80
            elif table_catalog.table_type == TableType.STRUCTURED_DATA:
1✔
81
                del_batch.frames.columns = original_column_names
1✔
82
                table_needed = del_batch.frames[
1✔
83
                    [f"{self.predicate.children[0].col_name}"]
84
                ]
85
                storage_engine.delete(table_catalog, table_needed.iloc[0])
1✔
86
            yield Batch(pd.DataFrame(["Deleted row"]))
1✔
87

88
        except Exception as e:
×
89
            logger.error(e)
×
90
            raise ExecutorError(e)
×
91

92
    def __call__(self, **kwargs) -> Generator[Batch, None, None]:
1✔
93
        yield from self.exec(**kwargs)
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc