• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

georgia-tech-db / eva / #725

pending completion
#725

push

circle-ci

web-flow
chore: reducing coverage loss (#619)

* adding delete operation

* Adding Insert Statement

* checkpoint

* supporting multiple entries

* implemented for structured data error

* adding parser visitor for delete

* delete executor

* delete plan and rules

* adding delete to plan executor

* change position of LogicalDelete

* logical delimeter

* delete test case

* adding test case

* adding test case

* adding delete testcase

* adding predicate to delete executor

* adding delete to Image storage

* bug fix in delete

* fixing testcase

* adding test case for insert statement

* remove order_by from statement_binder.py

* better variable names, using Batch

* error message for insert

* removing order_by and limit from delete

* remove order_by and limit

* use f-string

* adding to changelog

* removing commit messages

* formatting

* fixing comments

* formatting

* eva insert f32 values

* fix: should delete range

* delete multiple rows

* udf bootstrap

* try to run tests in parallel

* minor fix for ray to work

* ray fixes

---------

Co-authored-by: Aryan-Rajoria <aryanrajoria1003@gmail.com>
Co-authored-by: Gaurav <gaurav21776@gmail.com>

236 of 236 new or added lines in 53 files covered. (100.0%)

7003 of 9005 relevant lines covered (77.77%)

0.78 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

24.58
/eva/storage/sqlite_storage_engine.py
1
# coding=utf-8
2
# Copyright 2018-2022 EVA
3
#
4
# Licensed under the Apache License, Version 2.0 (the "License");
5
# you may not use this file except in compliance with the License.
6
# You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
from typing import Any, Dict, Iterator, List
1✔
16

17
import numpy as np
1✔
18
import pandas as pd
1✔
19
from sqlalchemy import Table, and_, inspect
1✔
20

21
from eva.catalog.catalog_type import ColumnType
1✔
22
from eva.catalog.models.base_model import BaseModel
1✔
23
from eva.catalog.models.column_catalog import ColumnCatalogEntry
1✔
24
from eva.catalog.models.table_catalog import TableCatalogEntry
1✔
25
from eva.catalog.schema_utils import SchemaUtils
1✔
26
from eva.catalog.sql_config import IDENTIFIER_COLUMN, SQLConfig
1✔
27
from eva.models.storage.batch import Batch
1✔
28
from eva.parser.table_ref import TableInfo
1✔
29
from eva.storage.abstract_storage_engine import AbstractStorageEngine
1✔
30
from eva.utils.generic_utils import PickleSerializer, get_size
1✔
31
from eva.utils.logging_manager import logger
1✔
32

33
# Leveraging Dynamic schema in SQLAlchemy
34
# https://sparrigan.github.io/sql/sqla/2016/01/03/dynamic-tables.html
35

36

37
class SQLStorageEngine(AbstractStorageEngine):
1✔
38
    def __init__(self):
1✔
39
        """
40
        Grab the existing sql session
41
        """
42
        self._sql_session = SQLConfig().session
1✔
43
        self._sql_engine = SQLConfig().engine
1✔
44
        self._serializer = PickleSerializer
1✔
45

46
    def _dict_to_sql_row(self, dict_row: dict, columns: List[ColumnCatalogEntry]):
1✔
47
        # Serialize numpy data
48
        for col in columns:
×
49
            if col.type == ColumnType.NDARRAY:
×
50
                dict_row[col.name] = self._serializer.serialize(dict_row[col.name])
×
51
            elif isinstance(dict_row[col.name], (np.generic,)):
×
52
                # SqlAlchemy does not consume numpy generic data types
53
                # convert numpy datatype to python generic datatype using tolist()
54
                # eg. np.int64 -> int
55
                # https://stackoverflow.com/a/53067954
56
                dict_row[col.name] = dict_row[col.name].tolist()
×
57
        return dict_row
×
58

59
    def _sql_row_to_dict(self, sql_row: tuple, columns: List[ColumnCatalogEntry]):
1✔
60
        # Deserialize numpy data
61
        dict_row = {}
×
62
        for idx, col in enumerate(columns):
×
63
            if col.type == ColumnType.NDARRAY:
×
64
                dict_row[col.name] = self._serializer.deserialize(sql_row[idx])
×
65
            else:
66
                dict_row[col.name] = sql_row[idx]
×
67
        return dict_row
×
68

69
    def _try_loading_table_via_reflection(self, table_name: str):
1✔
70
        metadata_obj = BaseModel.metadata
×
71
        if table_name in metadata_obj.tables:
×
72
            return metadata_obj.tables[table_name]
×
73
        # reflection
74
        insp = inspect(self._sql_engine)
×
75
        if insp.has_table(table_name):
×
76
            table = Table(table_name, metadata_obj)
×
77
            insp.reflect_table(table, None)
×
78
            return table
×
79
        else:
80
            err_msg = f"No table found with name {table_name}"
×
81
            logger.exception(err_msg)
×
82
            raise Exception(err_msg)
×
83

84
    def create(self, table: TableCatalogEntry, **kwargs):
1✔
85
        """
86
        Create an empty table in sql.
87
        It dynamically constructs schema in sqlaclchemy
88
        to create the table
89
        """
90
        attr_dict = {"__tablename__": table.name}
×
91

92
        # During table creation, assume row_id is automatically handled by
93
        # the sqlalchemy engine.
94
        table_columns = [col for col in table.columns if col.name != IDENTIFIER_COLUMN]
×
95
        sqlalchemy_schema = SchemaUtils.xform_to_sqlalchemy_schema(table_columns)
×
96

97
        attr_dict.update(sqlalchemy_schema)
×
98
        # dynamic schema generation
99
        # https://sparrigan.github.io/sql/sqla/2016/01/03/dynamic-tables.html
100
        new_table = type("__placeholder_class_name", (BaseModel,), attr_dict)()
×
101
        BaseModel.metadata.tables[table.name].create(self._sql_engine)
×
102
        self._sql_session.commit()
×
103
        return new_table
×
104

105
    def drop(self, table: TableCatalogEntry):
1✔
106
        try:
×
107
            table_to_remove = self._try_loading_table_via_reflection(table.name)
×
108
            table_to_remove.drop()
×
109
            # In-memory metadata does not automatically sync with the database
110
            # therefore manually removing the table from the in-memory metadata
111
            # https://github.com/sqlalchemy/sqlalchemy/issues/5112
112
            BaseModel.metadata.remove(table_to_remove)
×
113
            self._sql_session.commit()
×
114
        except Exception as e:
×
115
            err_msg = f"Failed to drop the table {table.name} with Exception {str(e)}"
×
116
            logger.exception(err_msg)
×
117
            raise Exception(err_msg)
×
118

119
    def write(self, table: TableCatalogEntry, rows: Batch):
1✔
120
        """
121
        Write rows into the sql table.
122

123
        Arguments:
124
            table: table metadata object to write into
125
            rows : batch to be persisted in the storage.
126
        """
127
        try:
×
128
            table_to_update = self._try_loading_table_via_reflection(table.name)
×
129
            columns = rows.frames.keys()
×
130
            data = []
×
131

132
            # During table writes, assume row_id is automatically handled by
133
            # the sqlalchemy engine. Another assumption we make here is the
134
            # updated data need not to take care of row_id.
135
            table_columns = [
×
136
                col for col in table.columns if col.name != IDENTIFIER_COLUMN
137
            ]
138

139
            # ToDo: validate the data type before inserting into the table
140
            for record in rows.frames.values:
×
141
                row_data = {col: record[idx] for idx, col in enumerate(columns)}
×
142
                data.append(self._dict_to_sql_row(row_data, table_columns))
×
143
            self._sql_engine.execute(table_to_update.insert(), data)
×
144
            self._sql_session.commit()
×
145
        except Exception as e:
×
146
            err_msg = f"Failed to update the table {table.name} with exception {str(e)}"
×
147
            logger.exception(err_msg)
×
148
            raise Exception(err_msg)
×
149

150
    def read(
1✔
151
        self, table: TableCatalogEntry, batch_mem_size: int = 30000000
152
    ) -> Iterator[Batch]:
153
        """
154
        Reads the table and return a batch iterator for the
155
        tuples.
156

157
        Argument:
158
            table: table metadata object of the table to read
159
            batch_mem_size (int): memory size of the batch read from storage
160
        Return:
161
            Iterator of Batch read.
162
        """
163
        try:
×
164
            table_to_read = self._try_loading_table_via_reflection(table.name)
×
165
            result = self._sql_engine.execute(table_to_read.select())
×
166
            data_batch = []
×
167
            row_size = None
×
168
            for row in result:
×
169
                # Todo: Verfiy the order of columns in row matches the table.columns
170
                # For table read, we provide row_id so that user can also retrieve
171
                # row_id from the table.
172
                data_batch.append(self._sql_row_to_dict(row, table.columns))
×
173
                if row_size is None:
×
174
                    row_size = 0
×
175
                    row_size = get_size(data_batch)
×
176
                if len(data_batch) * row_size >= batch_mem_size:
×
177
                    yield Batch(pd.DataFrame(data_batch))
×
178
                    data_batch = []
×
179
            if data_batch:
×
180
                yield Batch(pd.DataFrame(data_batch))
×
181

182
        except Exception as e:
×
183
            err_msg = f"Failed to read the table {table.name} with exception {str(e)}"
×
184
            logger.exception(err_msg)
×
185
            raise Exception(err_msg)
×
186

187
    def delete(self, table: TableCatalogEntry, where_clause: Dict[str, Any]):
1✔
188
        """Delete tuples from the table where rows satisfy the where_clause.
189
        The current implementation only handles equality predicates.
190

191
        Argument:
192
            table: table metadata object of the table
193
            where_clause (Dict[str, Any]): where clause use to find the tuples to
194
            remove. The key should be the column name and value should be the tuple
195
            value. The function assumes an equality condition
196
        """
197
        try:
×
198
            table_to_delete_from = self._try_loading_table_via_reflection(table.name)
×
199
            table_columns = [
×
200
                col.name
201
                for col in table_to_delete_from.columns
202
                if col.name != "_row_id"
203
            ]
204
            filter_clause = []
×
205
            # verify where clause and convert to sqlalchemy supported filter
206
            # https://stackoverflow.com/questions/34026210/where-filter-from-table-object-using-a-dictionary-or-kwargs
207
            for column, value in where_clause.items():
×
208
                if column not in table_columns:
×
209
                    raise Exception(
×
210
                        f"where_clause contains a column {column} not in the table {table_to_delete_from}"
211
                    )
212
                filter_clause.append(table_to_delete_from.columns[column] == value)
×
213

214
            d = table_to_delete_from.delete().where(and_(*filter_clause))
×
215
            self._sql_engine.execute(d)
×
216
            self._sql_session.commit()
×
217
        except Exception as e:
×
218
            err_msg = (
×
219
                f"Failed to delete from the table {table.name} with exception {str(e)}"
220
            )
221
            logger.exception(err_msg)
×
222
            raise Exception(err_msg)
×
223

224
    def rename(self, old_table: TableCatalogEntry, new_name: TableInfo):
1✔
225
        raise Exception("Rename not supported for structured data table")
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc