• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

georgia-tech-db / eva / 20a9a0f9-edcc-437c-815d-bcc1a2d22b17

10 Nov 2023 04:50AM UTC coverage: 66.644% (-10.2%) from 76.812%
20a9a0f9-edcc-437c-815d-bcc1a2d22b17

push

circleci

americast
update docs

0 of 1 new or added line in 1 file covered. (0.0%)

1354 existing lines in 113 files now uncovered.

8767 of 13155 relevant lines covered (66.64%)

0.67 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.46
/evadb/storage/sqlite_storage_engine.py
1
# coding=utf-8
2
# Copyright 2018-2023 EvaDB
3
#
4
# Licensed under the Apache License, Version 2.0 (the "License");
5
# you may not use this file except in compliance with the License.
6
# You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
from typing import Iterator, List
1✔
16

17
import numpy as np
1✔
18
import pandas as pd
1✔
19
from sqlalchemy import Table, inspect
1✔
20
from sqlalchemy.sql.expression import ColumnElement
1✔
21

22
from evadb.catalog.catalog_type import ColumnType
1✔
23
from evadb.catalog.models.base_model import BaseModel
1✔
24
from evadb.catalog.models.column_catalog import ColumnCatalogEntry
1✔
25
from evadb.catalog.models.table_catalog import TableCatalogEntry
1✔
26
from evadb.catalog.schema_utils import SchemaUtils
1✔
27
from evadb.catalog.sql_config import IDENTIFIER_COLUMN, ROW_NUM_COLUMN
1✔
28
from evadb.database import EvaDBDatabase
1✔
29
from evadb.models.storage.batch import Batch
1✔
30
from evadb.parser.table_ref import TableInfo
1✔
31
from evadb.storage.abstract_storage_engine import AbstractStorageEngine
1✔
32
from evadb.utils.generic_utils import PickleSerializer, rebatch
1✔
33
from evadb.utils.logging_manager import logger
1✔
34

35
# Leveraging Dynamic schema in SQLAlchemy
36
# https://sparrigan.github.io/sql/sqla/2016/01/03/dynamic-tables.html
37

38

39
class SQLStorageEngine(AbstractStorageEngine):
1✔
40
    def __init__(self, db: EvaDBDatabase):
1✔
41
        """
42
        Grab the existing sql session
43
        """
44
        super().__init__(db)
1✔
45
        self._sql_session = db.catalog().sql_config.session
1✔
46
        self._sql_engine = db.catalog().sql_config.engine
1✔
47
        self._serializer = PickleSerializer
1✔
48

49
    def _dict_to_sql_row(self, dict_row: dict, columns: List[ColumnCatalogEntry]):
1✔
50
        # Serialize numpy data
51
        for col in columns:
1✔
52
            if col.type == ColumnType.NDARRAY:
1✔
53
                dict_row[col.name] = self._serializer.serialize(dict_row[col.name])
1✔
54
            elif isinstance(dict_row[col.name], (np.generic,)):
1✔
55
                # Sqlalchemy does not consume numpy generic data types
56
                # convert numpy datatype to python generic datatype using tolist()
57
                # eg. np.int64 -> int
58
                # https://stackoverflow.com/a/53067954
59
                dict_row[col.name] = dict_row[col.name].tolist()
1✔
60
        return dict_row
1✔
61

62
    def _deserialize_sql_row(self, sql_row: dict, columns: List[ColumnCatalogEntry]):
1✔
63
        # Deserialize numpy data
64
        dict_row = {}
1✔
65
        for idx, col in enumerate(columns):
1✔
66
            if col.type == ColumnType.NDARRAY:
1✔
67
                dict_row[col.name] = self._serializer.deserialize(sql_row[col.name])
1✔
68
            else:
69
                dict_row[col.name] = sql_row[col.name]
1✔
70
        dict_row[ROW_NUM_COLUMN] = dict_row[IDENTIFIER_COLUMN]
1✔
71
        return dict_row
1✔
72

73
    def _try_loading_table_via_reflection(self, table_name: str):
1✔
74
        metadata_obj = BaseModel.metadata
1✔
75
        if table_name in metadata_obj.tables:
1✔
76
            return metadata_obj.tables[table_name]
1✔
77
        # reflection
UNCOV
78
        insp = inspect(self._sql_engine)
×
UNCOV
79
        if insp.has_table(table_name):
×
80
            table = Table(table_name, metadata_obj)
×
81
            insp.reflect_table(table, None)
×
82
            return table
×
83
        else:
UNCOV
84
            err_msg = f"No table found with name {table_name}"
×
85
            logger.exception(err_msg)
86
            raise Exception(err_msg)
87

88
    def create(self, table: TableCatalogEntry, **kwargs):
1✔
89
        """
90
        Create an empty table in sql.
91
        It dynamically constructs schema in sqlaclchemy
92
        to create the table
93
        """
94
        attr_dict = {"__tablename__": table.name}
1✔
95

96
        # During table creation, assume row_id is automatically handled by
97
        # the sqlalchemy engine.
98
        table_columns = [
1✔
99
            col
100
            for col in table.columns
101
            if (col.name != IDENTIFIER_COLUMN and col.name != ROW_NUM_COLUMN)
102
        ]
103
        sqlalchemy_schema = SchemaUtils.xform_to_sqlalchemy_schema(table_columns)
1✔
104
        attr_dict.update(sqlalchemy_schema)
1✔
105

106
        insp = inspect(self._sql_engine)
1✔
107
        if insp.has_table(table.name):
1✔
108
            logger.warning("Table {table.name} already exists")
×
109
            return BaseModel.metadata.tables[table.name]
×
110

111
        # dynamic schema generation
112
        # https://sparrigan.github.io/sql/sqla/2016/01/03/dynamic-tables.html
113
        new_table = type(
1✔
114
            f"__placeholder_class_name__{table.name}", (BaseModel,), attr_dict
115
        )()
116
        table = BaseModel.metadata.tables[table.name]
1✔
117

118
        if not insp.has_table(table.name):
1✔
119
            BaseModel.metadata.tables[table.name].create(self._sql_engine)
1✔
120
        self._sql_session.commit()
1✔
121
        return new_table
1✔
122

123
    def drop(self, table: TableCatalogEntry):
1✔
124
        try:
1✔
125
            table_to_remove = self._try_loading_table_via_reflection(table.name)
1✔
126
            insp = inspect(self._sql_engine)
1✔
127
            if insp.has_table(table_to_remove.name):
1✔
128
                table_to_remove.drop(self._sql_engine)
1✔
129
                # In-memory metadata does not automatically sync with the database
130
                # therefore manually removing the table from the in-memory metadata
131
                # https://github.com/sqlalchemy/sqlalchemy/issues/5112
132
                BaseModel.metadata.remove(table_to_remove)
1✔
133
            self._sql_session.commit()
1✔
134
        except Exception as e:
135
            err_msg = f"Failed to drop the table {table.name} with Exception {str(e)}"
136
            logger.exception(err_msg)
137
            raise Exception(err_msg)
138

139
    def write(self, table: TableCatalogEntry, rows: Batch):
1✔
140
        """
141
        Write rows into the sql table.
142

143
        Arguments:
144
            table: table metadata object to write into
145
            rows : batch to be persisted in the storage.
146
        """
147
        try:
1✔
148
            table_to_update = self._try_loading_table_via_reflection(table.name)
1✔
149
            columns = rows.frames.keys()
1✔
150
            data = []
1✔
151

152
            # During table writes, assume row_id is automatically handled by
153
            # the sqlalchemy engine. Another assumption we make here is the
154
            # updated data need not to take care of row_id.
155
            table_columns = [
1✔
156
                col
157
                for col in table.columns
158
                if (col.name != IDENTIFIER_COLUMN and col.name != ROW_NUM_COLUMN)
159
            ]
160

161
            # Todo: validate the data type before inserting into the table
162
            for record in rows.frames.values:
1✔
163
                row_data = {
1✔
164
                    col: record[idx]
165
                    for idx, col in enumerate(columns)
166
                    if col != ROW_NUM_COLUMN
167
                }
168
                data.append(self._dict_to_sql_row(row_data, table_columns))
1✔
169
            self._sql_session.execute(table_to_update.insert(), data)
1✔
170
            self._sql_session.commit()
1✔
171
        except Exception as e:
172
            err_msg = f"Failed to update the table {table.name} with exception {str(e)}"
173
            logger.exception(err_msg)
174
            raise Exception(err_msg)
175

176
    def read(
1✔
177
        self, table: TableCatalogEntry, batch_mem_size: int = 30000000
178
    ) -> Iterator[Batch]:
179
        """
180
        Reads the table and return a batch iterator for the
181
        tuples.
182

183
        Argument:
184
            table: table metadata object of the table to read
185
            batch_mem_size (int): memory size of the batch read from storage
186
        Return:
187
            Iterator of Batch read.
188
        """
189
        try:
1✔
190
            table_to_read = self._try_loading_table_via_reflection(table.name)
1✔
191
            result = self._sql_session.execute(table_to_read.select()).fetchall()
1✔
192
            result_iter = (
1✔
193
                self._deserialize_sql_row(row._asdict(), table.columns)
194
                for row in result
195
            )
196
            for df in rebatch(result_iter, batch_mem_size):
1✔
197
                yield Batch(pd.DataFrame(df))
1✔
198
        except Exception as e:
199
            err_msg = f"Failed to read the table {table.name} with exception {str(e)}"
200
            logger.exception(err_msg)
201
            raise Exception(err_msg)
202

203
    def delete(
1✔
204
        self, table: TableCatalogEntry, sqlalchemy_filter_clause: "ColumnElement[bool]"
205
    ):
206
        """Delete tuples from the table where rows satisfy the where_clause.
207
        The current implementation only handles equality predicates.
208

209
        Argument:
210
            table: table metadata object of the table
211
            where_clause: clause used to find the tuples to remove.
212
        """
UNCOV
213
        try:
×
UNCOV
214
            table_to_delete_from = self._try_loading_table_via_reflection(table.name)
×
UNCOV
215
            d = table_to_delete_from.delete().where(sqlalchemy_filter_clause)
×
216
            self._sql_session.execute(d)
×
217
            self._sql_session.commit()
×
218
        except Exception as e:
219
            err_msg = (
220
                f"Failed to delete from the table {table.name} with exception {str(e)}"
221
            )
222
            logger.exception(err_msg)
223
            raise Exception(err_msg)
224

225
    def rename(self, old_table: TableCatalogEntry, new_name: TableInfo):
1✔
226
        raise Exception("Rename not supported for structured data table")
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc