• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

georgia-tech-db / eva / a4c010ba-78be-4818-8e6f-1da08c6af280

31 Aug 2023 11:59PM UTC coverage: 70.992% (-10.6%) from 81.552%
a4c010ba-78be-4818-8e6f-1da08c6af280

push

circle-ci

web-flow
Merge branch 'staging' into evadb_staging

54 of 54 new or added lines in 3 files covered. (100.0%)

8020 of 11297 relevant lines covered (70.99%)

0.71 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.5
/evadb/storage/sqlite_storage_engine.py
1
# coding=utf-8
2
# Copyright 2018-2023 EvaDB
3
#
4
# Licensed under the Apache License, Version 2.0 (the "License");
5
# you may not use this file except in compliance with the License.
6
# You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
from typing import Iterator, List
1✔
16

17
import numpy as np
1✔
18
import pandas as pd
1✔
19
from sqlalchemy import Table, inspect
1✔
20
from sqlalchemy.sql.expression import ColumnElement
1✔
21

22
from evadb.catalog.catalog_type import ColumnType
1✔
23
from evadb.catalog.models.base_model import BaseModel
1✔
24
from evadb.catalog.models.column_catalog import ColumnCatalogEntry
1✔
25
from evadb.catalog.models.table_catalog import TableCatalogEntry
1✔
26
from evadb.catalog.schema_utils import SchemaUtils
1✔
27
from evadb.catalog.sql_config import IDENTIFIER_COLUMN
1✔
28
from evadb.database import EvaDBDatabase
1✔
29
from evadb.models.storage.batch import Batch
1✔
30
from evadb.parser.table_ref import TableInfo
1✔
31
from evadb.storage.abstract_storage_engine import AbstractStorageEngine
1✔
32
from evadb.utils.generic_utils import PickleSerializer, get_size
1✔
33
from evadb.utils.logging_manager import logger
1✔
34

35
# Leveraging Dynamic schema in SQLAlchemy
36
# https://sparrigan.github.io/sql/sqla/2016/01/03/dynamic-tables.html
37

38

39
class SQLStorageEngine(AbstractStorageEngine):
1✔
40
    def __init__(self, db: EvaDBDatabase):
1✔
41
        """
42
        Grab the existing sql session
43
        """
44
        super().__init__(db)
1✔
45
        self._sql_session = db.catalog().sql_config.session
1✔
46
        self._sql_engine = db.catalog().sql_config.engine
1✔
47
        self._serializer = PickleSerializer
1✔
48

49
    def _dict_to_sql_row(self, dict_row: dict, columns: List[ColumnCatalogEntry]):
1✔
50
        # Serialize numpy data
51
        for col in columns:
1✔
52
            if col.type == ColumnType.NDARRAY:
1✔
53
                dict_row[col.name] = self._serializer.serialize(dict_row[col.name])
1✔
54
            elif isinstance(dict_row[col.name], (np.generic,)):
1✔
55
                # Sqlalchemy does not consume numpy generic data types
56
                # convert numpy datatype to python generic datatype using tolist()
57
                # eg. np.int64 -> int
58
                # https://stackoverflow.com/a/53067954
59
                dict_row[col.name] = dict_row[col.name].tolist()
1✔
60
        return dict_row
1✔
61

62
    def _deserialize_sql_row(self, sql_row: dict, columns: List[ColumnCatalogEntry]):
1✔
63
        # Deserialize numpy data
64
        dict_row = {}
1✔
65
        for idx, col in enumerate(columns):
1✔
66
            if col.type == ColumnType.NDARRAY:
1✔
67
                dict_row[col.name] = self._serializer.deserialize(sql_row[col.name])
1✔
68
            else:
69
                dict_row[col.name] = sql_row[col.name]
1✔
70
        return dict_row
1✔
71

72
    def _try_loading_table_via_reflection(self, table_name: str):
1✔
73
        metadata_obj = BaseModel.metadata
1✔
74
        if table_name in metadata_obj.tables:
1✔
75
            return metadata_obj.tables[table_name]
1✔
76
        # reflection
77
        insp = inspect(self._sql_engine)
×
78
        if insp.has_table(table_name):
×
79
            table = Table(table_name, metadata_obj)
×
80
            insp.reflect_table(table, None)
×
81
            return table
×
82
        else:
83
            err_msg = f"No table found with name {table_name}"
×
84
            logger.exception(err_msg)
85
            raise Exception(err_msg)
86

87
    def create(self, table: TableCatalogEntry, **kwargs):
1✔
88
        """
89
        Create an empty table in sql.
90
        It dynamically constructs schema in sqlaclchemy
91
        to create the table
92
        """
93
        attr_dict = {"__tablename__": table.name}
1✔
94

95
        # During table creation, assume row_id is automatically handled by
96
        # the sqlalchemy engine.
97
        table_columns = [col for col in table.columns if col.name != IDENTIFIER_COLUMN]
1✔
98
        sqlalchemy_schema = SchemaUtils.xform_to_sqlalchemy_schema(table_columns)
1✔
99
        attr_dict.update(sqlalchemy_schema)
1✔
100

101
        insp = inspect(self._sql_engine)
1✔
102
        if insp.has_table(table.name):
1✔
103
            logger.warning("Table {table.name} already exists")
×
104
            return BaseModel.metadata.tables[table.name]
×
105

106
        # dynamic schema generation
107
        # https://sparrigan.github.io/sql/sqla/2016/01/03/dynamic-tables.html
108
        new_table = type(
1✔
109
            f"__placeholder_class_name__{table.name}", (BaseModel,), attr_dict
110
        )()
111
        table = BaseModel.metadata.tables[table.name]
1✔
112

113
        if not insp.has_table(table.name):
1✔
114
            BaseModel.metadata.tables[table.name].create(self._sql_engine)
1✔
115
        self._sql_session.commit()
1✔
116
        return new_table
1✔
117

118
    def drop(self, table: TableCatalogEntry):
1✔
119
        try:
1✔
120
            table_to_remove = self._try_loading_table_via_reflection(table.name)
1✔
121
            insp = inspect(self._sql_engine)
1✔
122
            if insp.has_table(table_to_remove.name):
1✔
123
                table_to_remove.drop(self._sql_engine)
1✔
124
                # In-memory metadata does not automatically sync with the database
125
                # therefore manually removing the table from the in-memory metadata
126
                # https://github.com/sqlalchemy/sqlalchemy/issues/5112
127
                BaseModel.metadata.remove(table_to_remove)
1✔
128
            self._sql_session.commit()
1✔
129
        except Exception as e:
130
            err_msg = f"Failed to drop the table {table.name} with Exception {str(e)}"
131
            logger.exception(err_msg)
132
            raise Exception(err_msg)
133

134
    def write(self, table: TableCatalogEntry, rows: Batch):
1✔
135
        """
136
        Write rows into the sql table.
137

138
        Arguments:
139
            table: table metadata object to write into
140
            rows : batch to be persisted in the storage.
141
        """
142
        try:
1✔
143
            table_to_update = self._try_loading_table_via_reflection(table.name)
1✔
144
            columns = rows.frames.keys()
1✔
145
            data = []
1✔
146

147
            # During table writes, assume row_id is automatically handled by
148
            # the sqlalchemy engine. Another assumption we make here is the
149
            # updated data need not to take care of row_id.
150
            table_columns = [
1✔
151
                col for col in table.columns if col.name != IDENTIFIER_COLUMN
152
            ]
153

154
            # Todo: validate the data type before inserting into the table
155
            for record in rows.frames.values:
1✔
156
                row_data = {col: record[idx] for idx, col in enumerate(columns)}
1✔
157
                data.append(self._dict_to_sql_row(row_data, table_columns))
1✔
158
            self._sql_session.execute(table_to_update.insert(), data)
1✔
159
            self._sql_session.commit()
1✔
160
        except Exception as e:
161
            err_msg = f"Failed to update the table {table.name} with exception {str(e)}"
162
            logger.exception(err_msg)
163
            raise Exception(err_msg)
164

165
    def read(
1✔
166
        self, table: TableCatalogEntry, batch_mem_size: int = 30000000
167
    ) -> Iterator[Batch]:
168
        """
169
        Reads the table and return a batch iterator for the
170
        tuples.
171

172
        Argument:
173
            table: table metadata object of the table to read
174
            batch_mem_size (int): memory size of the batch read from storage
175
        Return:
176
            Iterator of Batch read.
177
        """
178
        try:
1✔
179
            table_to_read = self._try_loading_table_via_reflection(table.name)
1✔
180
            result = self._sql_session.execute(table_to_read.select()).fetchall()
1✔
181
            data_batch = []
1✔
182
            row_size = None
1✔
183
            for row in result:
1✔
184
                # For table read, we provide row_id so that user can also retrieve
185
                # row_id from the table.
186
                data_batch.append(
1✔
187
                    self._deserialize_sql_row(row._asdict(), table.columns)
188
                )
189
                if row_size is None:
1✔
190
                    row_size = 0
1✔
191
                    row_size = get_size(data_batch)
1✔
192
                if len(data_batch) * row_size >= batch_mem_size:
1✔
193
                    yield Batch(pd.DataFrame(data_batch))
1✔
194
                    data_batch = []
1✔
195
            if data_batch:
1✔
196
                yield Batch(pd.DataFrame(data_batch))
1✔
197

198
        except Exception as e:
199
            err_msg = f"Failed to read the table {table.name} with exception {str(e)}"
200
            logger.exception(err_msg)
201
            raise Exception(err_msg)
202

203
    def delete(
1✔
204
        self, table: TableCatalogEntry, sqlalchemy_filter_clause: "ColumnElement[bool]"
205
    ):
206
        """Delete tuples from the table where rows satisfy the where_clause.
207
        The current implementation only handles equality predicates.
208

209
        Argument:
210
            table: table metadata object of the table
211
            where_clause: clause used to find the tuples to remove.
212
        """
213
        try:
×
214
            table_to_delete_from = self._try_loading_table_via_reflection(table.name)
×
215
            d = table_to_delete_from.delete().where(sqlalchemy_filter_clause)
×
216
            self._sql_session.execute(d)
×
217
            self._sql_session.commit()
×
218
        except Exception as e:
219
            err_msg = (
220
                f"Failed to delete from the table {table.name} with exception {str(e)}"
221
            )
222
            logger.exception(err_msg)
223
            raise Exception(err_msg)
224

225
    def rename(self, old_table: TableCatalogEntry, new_name: TableInfo):
1✔
226
        raise Exception("Rename not supported for structured data table")
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc