• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

georgia-tech-db / eva / 17d27eeb-d1e1-4c3b-a59c-b789a33e18ef

pending completion
17d27eeb-d1e1-4c3b-a59c-b789a33e18ef

Pull #587

circle-ci

jarulraj
checkpoint
Pull Request #587: feat: Delete and Insert operators for structured data

227 of 227 new or added lines in 22 files covered. (100.0%)

8477 of 9086 relevant lines covered (93.3%)

0.93 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.0
/eva/executor/insert_executor.py
1
# coding=utf-8
2
# Copyright 2018-2022 EVA
3
#
4
# Licensed under the Apache License, Version 2.0 (the "License");
5
# you may not use this file except in compliance with the License.
6
# You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
import pandas as pd
1✔
16

17
from eva.catalog.catalog_manager import CatalogManager
1✔
18
from eva.catalog.catalog_type import TableType
1✔
19
from eva.catalog.models.table_catalog import TableCatalogEntry
1✔
20
from eva.executor.abstract_executor import AbstractExecutor
1✔
21
from eva.executor.executor_utils import ExecutorError
1✔
22
from eva.models.storage.batch import Batch
1✔
23
from eva.plan_nodes.insert_plan import InsertPlan
1✔
24
from eva.storage.abstract_storage_engine import AbstractStorageEngine
1✔
25
from eva.storage.storage_engine import StorageEngine
1✔
26
from eva.utils.logging_manager import logger
1✔
27

28

29
class InsertExecutor(AbstractExecutor):
1✔
30
    def __init__(self, node: InsertPlan):
1✔
31
        super().__init__(node)
1✔
32
        self.catalog = CatalogManager()
1✔
33

34
    def validate(self):
1✔
35
        pass
×
36

37
    def exec(self):
1✔
38
        storage_engine = None
1✔
39
        table_catalog_entry = None
1✔
40
        try:
1✔
41
            # Get catalog entry
42
            table_name = self.node.table_ref.table.table_name
1✔
43
            database_name = self.node.table_ref.table.database_name
1✔
44
            table_catalog_entry = self.catalog.get_table_catalog_entry(
1✔
45
                table_name, database_name
46
            )
47

48
            # Implemented only for STRUCTURED_DATA
49
            if table_catalog_entry.table_type != TableType.STRUCTURED_DATA:
1✔
50
                raise NotImplementedError("INSERT only implemented for structured data")
×
51

52
            values_to_insert = []
1✔
53
            for i in self.node.value_list:
1✔
54
                values_to_insert.append(i.value)
1✔
55
            tuple_to_insert = tuple(values_to_insert)
1✔
56
            columns_to_insert = []
1✔
57
            for i in self.node.column_list:
1✔
58
                columns_to_insert.append(i.col_name)
1✔
59

60
            # Adding all values to Batch for insert
61
            logger.info(values_to_insert)
1✔
62
            logger.info(columns_to_insert)
1✔
63
            dataframe = pd.DataFrame([tuple_to_insert], columns=columns_to_insert)
1✔
64
            batch = Batch(dataframe)
1✔
65

66
            storage_engine = StorageEngine.factory(table_catalog_entry)
1✔
67
            storage_engine.write(table_catalog_entry, batch)
1✔
68
        except Exception as e:
×
69
            err_msg = f"Insert failed: encountered unexpected error {str(e)}"
×
70
            logger.error(err_msg)
×
71
            raise ExecutorError(err_msg)
×
72
        else:
73
            yield Batch(
1✔
74
                pd.DataFrame([f"Number of rows loaded: {str(len(values_to_insert))}"])
75
            )
76

77
    def _rollback_load(
1✔
78
        self,
79
        storage_engine: AbstractStorageEngine,
80
        table_obj: TableCatalogEntry,
81
        do_create: bool,
82
    ):
83
        try:
×
84
            if do_create:
×
85
                storage_engine.drop(table_obj)
×
86
        except Exception as e:
×
87
            logger.exception(
×
88
                f"Unexpected Exception {e} occured while rolling back. This is bad as the {self.media_type.name} table can be in a corrupt state. Please verify the table {table_obj} for correctness."
89
            )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc