• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

georgia-tech-db / eva / #837

19 Oct 2023 09:02AM UTC coverage: 0.0% (-78.6%) from 78.632%
#837

push

circle-ci

Andy Xu
Add CostEntry

0 of 12416 relevant lines covered (0.0%)

0.0 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/evadb/storage/sqlite_storage_engine.py
1
# coding=utf-8
2
# Copyright 2018-2023 EvaDB
3
#
4
# Licensed under the Apache License, Version 2.0 (the "License");
5
# you may not use this file except in compliance with the License.
6
# You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
from typing import Iterator, List
×
16

17
import numpy as np
×
18
import pandas as pd
×
19
from sqlalchemy import Table, inspect
×
20
from sqlalchemy.sql.expression import ColumnElement
×
21

22
from evadb.catalog.catalog_type import ColumnType
×
23
from evadb.catalog.models.base_model import BaseModel
×
24
from evadb.catalog.models.column_catalog import ColumnCatalogEntry
×
25
from evadb.catalog.models.table_catalog import TableCatalogEntry
×
26
from evadb.catalog.schema_utils import SchemaUtils
×
27
from evadb.catalog.sql_config import IDENTIFIER_COLUMN, ROW_NUM_COLUMN
×
28
from evadb.database import EvaDBDatabase
×
29
from evadb.models.storage.batch import Batch
×
30
from evadb.parser.table_ref import TableInfo
×
31
from evadb.storage.abstract_storage_engine import AbstractStorageEngine
×
32
from evadb.utils.generic_utils import PickleSerializer, rebatch
×
33
from evadb.utils.logging_manager import logger
×
34

35
# Leveraging Dynamic schema in SQLAlchemy
36
# https://sparrigan.github.io/sql/sqla/2016/01/03/dynamic-tables.html
37

38

39
class SQLStorageEngine(AbstractStorageEngine):
×
40
    def __init__(self, db: EvaDBDatabase):
×
41
        """
42
        Grab the existing sql session
43
        """
44
        super().__init__(db)
×
45
        self._sql_session = db.catalog().sql_config.session
×
46
        self._sql_engine = db.catalog().sql_config.engine
×
47
        self._serializer = PickleSerializer
×
48

49
    def _dict_to_sql_row(self, dict_row: dict, columns: List[ColumnCatalogEntry]):
×
50
        # Serialize numpy data
51
        for col in columns:
×
52
            if col.type == ColumnType.NDARRAY:
×
53
                dict_row[col.name] = self._serializer.serialize(dict_row[col.name])
×
54
            elif isinstance(dict_row[col.name], (np.generic,)):
×
55
                # Sqlalchemy does not consume numpy generic data types
56
                # convert numpy datatype to python generic datatype using tolist()
57
                # eg. np.int64 -> int
58
                # https://stackoverflow.com/a/53067954
59
                dict_row[col.name] = dict_row[col.name].tolist()
×
60
        return dict_row
×
61

62
    def _deserialize_sql_row(self, sql_row: dict, columns: List[ColumnCatalogEntry]):
×
63
        # Deserialize numpy data
64
        dict_row = {}
×
65
        for idx, col in enumerate(columns):
×
66
            if col.type == ColumnType.NDARRAY:
×
67
                dict_row[col.name] = self._serializer.deserialize(sql_row[col.name])
×
68
            else:
69
                dict_row[col.name] = sql_row[col.name]
×
70
        dict_row[ROW_NUM_COLUMN] = dict_row[IDENTIFIER_COLUMN]
×
71
        return dict_row
×
72

73
    def _try_loading_table_via_reflection(self, table_name: str):
×
74
        metadata_obj = BaseModel.metadata
×
75
        if table_name in metadata_obj.tables:
×
76
            return metadata_obj.tables[table_name]
×
77
        # reflection
78
        insp = inspect(self._sql_engine)
×
79
        if insp.has_table(table_name):
×
80
            table = Table(table_name, metadata_obj)
×
81
            insp.reflect_table(table, None)
×
82
            return table
×
83
        else:
84
            err_msg = f"No table found with name {table_name}"
×
85
            logger.exception(err_msg)
86
            raise Exception(err_msg)
87

88
    def create(self, table: TableCatalogEntry, **kwargs):
×
89
        """
90
        Create an empty table in sql.
91
        It dynamically constructs schema in sqlaclchemy
92
        to create the table
93
        """
94
        attr_dict = {"__tablename__": table.name}
×
95

96
        # During table creation, assume row_id is automatically handled by
97
        # the sqlalchemy engine.
98
        table_columns = [
×
99
            col
100
            for col in table.columns
101
            if (col.name != IDENTIFIER_COLUMN and col.name != ROW_NUM_COLUMN)
102
        ]
103
        sqlalchemy_schema = SchemaUtils.xform_to_sqlalchemy_schema(table_columns)
×
104
        attr_dict.update(sqlalchemy_schema)
×
105

106
        insp = inspect(self._sql_engine)
×
107
        if insp.has_table(table.name):
×
108
            logger.warning("Table {table.name} already exists")
×
109
            return BaseModel.metadata.tables[table.name]
×
110

111
        # dynamic schema generation
112
        # https://sparrigan.github.io/sql/sqla/2016/01/03/dynamic-tables.html
113
        new_table = type(
×
114
            f"__placeholder_class_name__{table.name}", (BaseModel,), attr_dict
115
        )()
116
        table = BaseModel.metadata.tables[table.name]
×
117

118
        if not insp.has_table(table.name):
×
119
            BaseModel.metadata.tables[table.name].create(self._sql_engine)
×
120
        self._sql_session.commit()
×
121
        return new_table
×
122

123
    def drop(self, table: TableCatalogEntry):
×
124
        try:
×
125
            table_to_remove = self._try_loading_table_via_reflection(table.name)
×
126
            insp = inspect(self._sql_engine)
×
127
            if insp.has_table(table_to_remove.name):
×
128
                table_to_remove.drop(self._sql_engine)
×
129
                # In-memory metadata does not automatically sync with the database
130
                # therefore manually removing the table from the in-memory metadata
131
                # https://github.com/sqlalchemy/sqlalchemy/issues/5112
132
                BaseModel.metadata.remove(table_to_remove)
×
133
            self._sql_session.commit()
×
134
        except Exception as e:
135
            err_msg = f"Failed to drop the table {table.name} with Exception {str(e)}"
136
            logger.exception(err_msg)
137
            raise Exception(err_msg)
138

139
    def write(self, table: TableCatalogEntry, rows: Batch):
×
140
        """
141
        Write rows into the sql table.
142

143
        Arguments:
144
            table: table metadata object to write into
145
            rows : batch to be persisted in the storage.
146
        """
147
        try:
×
148
            table_to_update = self._try_loading_table_via_reflection(table.name)
×
149
            columns = rows.frames.keys()
×
150
            data = []
×
151

152
            # During table writes, assume row_id is automatically handled by
153
            # the sqlalchemy engine. Another assumption we make here is the
154
            # updated data need not to take care of row_id.
155
            table_columns = [
×
156
                col
157
                for col in table.columns
158
                if (col.name != IDENTIFIER_COLUMN and col.name != ROW_NUM_COLUMN)
159
            ]
160

161
            # Todo: validate the data type before inserting into the table
162
            for record in rows.frames.values:
×
163
                row_data = {
×
164
                    col: record[idx]
165
                    for idx, col in enumerate(columns)
166
                    if col != ROW_NUM_COLUMN
167
                }
168
                data.append(self._dict_to_sql_row(row_data, table_columns))
×
169
            self._sql_session.execute(table_to_update.insert(), data)
×
170
            self._sql_session.commit()
×
171
        except Exception as e:
172
            err_msg = f"Failed to update the table {table.name} with exception {str(e)}"
173
            logger.exception(err_msg)
174
            raise Exception(err_msg)
175

176
    def read(
×
177
        self, table: TableCatalogEntry, batch_mem_size: int = 30000000
178
    ) -> Iterator[Batch]:
179
        """
180
        Reads the table and return a batch iterator for the
181
        tuples.
182

183
        Argument:
184
            table: table metadata object of the table to read
185
            batch_mem_size (int): memory size of the batch read from storage
186
        Return:
187
            Iterator of Batch read.
188
        """
189
        try:
×
190
            table_to_read = self._try_loading_table_via_reflection(table.name)
×
191
            result = self._sql_session.execute(table_to_read.select()).fetchall()
×
192
            result_iter = (
×
193
                self._deserialize_sql_row(row._asdict(), table.columns)
194
                for row in result
195
            )
196
            for df in rebatch(result_iter, batch_mem_size):
×
197
                yield Batch(pd.DataFrame(df))
×
198
        except Exception as e:
199
            err_msg = f"Failed to read the table {table.name} with exception {str(e)}"
200
            logger.exception(err_msg)
201
            raise Exception(err_msg)
202

203
    def delete(
×
204
        self, table: TableCatalogEntry, sqlalchemy_filter_clause: "ColumnElement[bool]"
205
    ):
206
        """Delete tuples from the table where rows satisfy the where_clause.
207
        The current implementation only handles equality predicates.
208

209
        Argument:
210
            table: table metadata object of the table
211
            where_clause: clause used to find the tuples to remove.
212
        """
213
        try:
×
214
            table_to_delete_from = self._try_loading_table_via_reflection(table.name)
×
215
            d = table_to_delete_from.delete().where(sqlalchemy_filter_clause)
×
216
            self._sql_session.execute(d)
×
217
            self._sql_session.commit()
×
218
        except Exception as e:
219
            err_msg = (
220
                f"Failed to delete from the table {table.name} with exception {str(e)}"
221
            )
222
            logger.exception(err_msg)
223
            raise Exception(err_msg)
224

225
    def rename(self, old_table: TableCatalogEntry, new_name: TableInfo):
×
226
        raise Exception("Rename not supported for structured data table")
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc