• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

georgia-tech-db / eva / e4f99d26-a857-4f78-b9e8-041e4254b9e1

18 Aug 2023 05:49AM UTC coverage: 94.434% (-0.8%) from 95.277%
e4f99d26-a857-4f78-b9e8-041e4254b9e1

Pull #935

circle-ci

xzdandy
remove print
Pull Request #935: Ludwig-based model train and tune support.

92 of 92 new or added lines in 10 files covered. (100.0%)

10264 of 10869 relevant lines covered (94.43%)

1.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.19
/evadb/storage/sqlite_storage_engine.py
1
# coding=utf-8
2
# Copyright 2018-2023 EvaDB
3
#
4
# Licensed under the Apache License, Version 2.0 (the "License");
5
# you may not use this file except in compliance with the License.
6
# You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
from typing import Iterator, List
2✔
16

17
import numpy as np
2✔
18
import pandas as pd
2✔
19
from sqlalchemy import Table, inspect
2✔
20
from sqlalchemy.sql.expression import ColumnElement
2✔
21

22
from evadb.catalog.catalog_type import ColumnType
2✔
23
from evadb.catalog.models.base_model import BaseModel
2✔
24
from evadb.catalog.models.column_catalog import ColumnCatalogEntry
2✔
25
from evadb.catalog.models.table_catalog import TableCatalogEntry
2✔
26
from evadb.catalog.schema_utils import SchemaUtils
2✔
27
from evadb.catalog.sql_config import IDENTIFIER_COLUMN
2✔
28
from evadb.database import EvaDBDatabase
2✔
29
from evadb.models.storage.batch import Batch
2✔
30
from evadb.parser.table_ref import TableInfo
2✔
31
from evadb.storage.abstract_storage_engine import AbstractStorageEngine
2✔
32
from evadb.utils.generic_utils import PickleSerializer, get_size
2✔
33
from evadb.utils.logging_manager import logger
2✔
34

35
# Leveraging Dynamic schema in SQLAlchemy
36
# https://sparrigan.github.io/sql/sqla/2016/01/03/dynamic-tables.html
37

38

39
class SQLStorageEngine(AbstractStorageEngine):
2✔
40
    def __init__(self, db: EvaDBDatabase):
2✔
41
        """
42
        Grab the existing sql session
43
        """
44
        super().__init__(db)
2✔
45
        self._sql_session = db.catalog().sql_config.session
2✔
46
        self._sql_engine = db.catalog().sql_config.engine
2✔
47
        self._serializer = PickleSerializer
2✔
48

49
    def _dict_to_sql_row(self, dict_row: dict, columns: List[ColumnCatalogEntry]):
2✔
50
        # Serialize numpy data
51
        for col in columns:
2✔
52
            if col.type == ColumnType.NDARRAY:
2✔
53
                dict_row[col.name] = self._serializer.serialize(dict_row[col.name])
2✔
54
            elif isinstance(dict_row[col.name], (np.generic,)):
2✔
55
                # Sqlalchemy does not consume numpy generic data types
56
                # convert numpy datatype to python generic datatype using tolist()
57
                # eg. np.int64 -> int
58
                # https://stackoverflow.com/a/53067954
59
                dict_row[col.name] = dict_row[col.name].tolist()
2✔
60
        return dict_row
2✔
61

62
    def _deserialize_sql_row(self, sql_row: dict, columns: List[ColumnCatalogEntry]):
2✔
63
        # Deserialize numpy data
64
        dict_row = {}
2✔
65
        for idx, col in enumerate(columns):
2✔
66
            if col.type == ColumnType.NDARRAY:
2✔
67
                dict_row[col.name] = self._serializer.deserialize(sql_row[col.name])
2✔
68
            else:
69
                dict_row[col.name] = sql_row[col.name]
2✔
70
        return dict_row
2✔
71

72
    def _try_loading_table_via_reflection(self, table_name: str):
2✔
73
        metadata_obj = BaseModel.metadata
2✔
74
        if table_name in metadata_obj.tables:
2✔
75
            return metadata_obj.tables[table_name]
2✔
76
        # reflection
77
        insp = inspect(self._sql_engine)
2✔
78
        if insp.has_table(table_name):
2✔
79
            table = Table(table_name, metadata_obj)
×
80
            insp.reflect_table(table, None)
×
81
            return table
×
82
        else:
83
            err_msg = f"No table found with name {table_name}"
2✔
84
            logger.exception(err_msg)
85
            raise Exception(err_msg)
86

87
    def create(self, table: TableCatalogEntry, **kwargs):
2✔
88
        """
89
        Create an empty table in sql.
90
        It dynamically constructs schema in sqlaclchemy
91
        to create the table
92
        """
93
        attr_dict = {"__tablename__": table.name}
2✔
94

95
        # During table creation, assume row_id is automatically handled by
96
        # the sqlalchemy engine.
97
        table_columns = [col for col in table.columns if col.name != IDENTIFIER_COLUMN]
2✔
98
        sqlalchemy_schema = SchemaUtils.xform_to_sqlalchemy_schema(table_columns)
2✔
99
        attr_dict.update(sqlalchemy_schema)
2✔
100

101
        insp = inspect(self._sql_engine)
2✔
102
        if insp.has_table(table.name):
2✔
103
            logger.warning("Table {table.name} already exists")
×
104
            return BaseModel.metadata.tables[table.name]
×
105

106
        # dynamic schema generation
107
        # https://sparrigan.github.io/sql/sqla/2016/01/03/dynamic-tables.html
108
        new_table = type(
2✔
109
            f"__placeholder_class_name__{table.name}", (BaseModel,), attr_dict
110
        )()
111
        table = BaseModel.metadata.tables[table.name]
2✔
112

113
        if not insp.has_table(table.name):
2✔
114
            BaseModel.metadata.tables[table.name].create(self._sql_engine)
2✔
115
        self._sql_session.commit()
2✔
116
        return new_table
2✔
117

118
    def drop(self, table: TableCatalogEntry):
2✔
119
        try:
2✔
120
            table_to_remove = self._try_loading_table_via_reflection(table.name)
2✔
121
            insp = inspect(self._sql_engine)
2✔
122
            if insp.has_table(table_to_remove.name):
2✔
123
                table_to_remove.drop(self._sql_engine)
2✔
124
                # In-memory metadata does not automatically sync with the database
125
                # therefore manually removing the table from the in-memory metadata
126
                # https://github.com/sqlalchemy/sqlalchemy/issues/5112
127
                BaseModel.metadata.remove(table_to_remove)
2✔
128
            self._sql_session.commit()
2✔
129
        except Exception as e:
130
            err_msg = f"Failed to drop the table {table.name} with Exception {str(e)}"
131
            logger.exception(err_msg)
132
            raise Exception(err_msg)
133

134
    def write(self, table: TableCatalogEntry, rows: Batch):
2✔
135
        """
136
        Write rows into the sql table.
137

138
        Arguments:
139
            table: table metadata object to write into
140
            rows : batch to be persisted in the storage.
141
        """
142
        try:
2✔
143
            table_to_update = self._try_loading_table_via_reflection(table.name)
2✔
144
            columns = rows.frames.keys()
2✔
145
            data = []
2✔
146

147
            # During table writes, assume row_id is automatically handled by
148
            # the sqlalchemy engine. Another assumption we make here is the
149
            # updated data need not to take care of row_id.
150
            table_columns = [
2✔
151
                col for col in table.columns if col.name != IDENTIFIER_COLUMN
152
            ]
153

154
            # Todo: validate the data type before inserting into the table
155
            for record in rows.frames.values:
2✔
156
                row_data = {col: record[idx] for idx, col in enumerate(columns)}
2✔
157
                data.append(self._dict_to_sql_row(row_data, table_columns))
2✔
158
            self._sql_session.execute(table_to_update.insert(), data)
2✔
159
            self._sql_session.commit()
2✔
160
        except Exception as e:
161
            err_msg = f"Failed to update the table {table.name} with exception {str(e)}"
162
            logger.exception(err_msg)
163
            raise Exception(err_msg)
164

165
    def read(
2✔
166
        self, table: TableCatalogEntry, batch_mem_size: int = 30000000
167
    ) -> Iterator[Batch]:
168
        """
169
        Reads the table and return a batch iterator for the
170
        tuples.
171

172
        Argument:
173
            table: table metadata object of the table to read
174
            batch_mem_size (int): memory size of the batch read from storage
175
        Return:
176
            Iterator of Batch read.
177
        """
178
        try:
2✔
179
            table_to_read = self._try_loading_table_via_reflection(table.name)
2✔
180
            result = self._sql_session.execute(table_to_read.select()).fetchall()
2✔
181
            data_batch = []
2✔
182
            row_size = None
2✔
183
            for row in result:
2✔
184
                # For table read, we provide row_id so that user can also retrieve
185
                # row_id from the table.
186
                data_batch.append(
2✔
187
                    self._deserialize_sql_row(row._asdict(), table.columns)
188
                )
189
                if row_size is None:
2✔
190
                    row_size = 0
2✔
191
                    row_size = get_size(data_batch)
2✔
192
                if len(data_batch) * row_size >= batch_mem_size:
2✔
193
                    yield Batch(pd.DataFrame(data_batch))
2✔
194
                    data_batch = []
2✔
195
            if data_batch:
2✔
196
                yield Batch(pd.DataFrame(data_batch))
2✔
197

198
        except Exception as e:
199
            err_msg = f"Failed to read the table {table.name} with exception {str(e)}"
200
            logger.exception(err_msg)
201
            raise Exception(err_msg)
202

203
    def delete(
2✔
204
        self, table: TableCatalogEntry, sqlalchemy_filter_clause: "ColumnElement[bool]"
205
    ):
206
        """Delete tuples from the table where rows satisfy the where_clause.
207
        The current implementation only handles equality predicates.
208

209
        Argument:
210
            table: table metadata object of the table
211
            where_clause: clause used to find the tuples to remove.
212
        """
213
        try:
2✔
214
            table_to_delete_from = self._try_loading_table_via_reflection(table.name)
2✔
215
            d = table_to_delete_from.delete().where(sqlalchemy_filter_clause)
2✔
216
            self._sql_session.execute(d)
2✔
217
            self._sql_session.commit()
2✔
218
        except Exception as e:
219
            err_msg = (
220
                f"Failed to delete from the table {table.name} with exception {str(e)}"
221
            )
222
            logger.exception(err_msg)
223
            raise Exception(err_msg)
224

225
    def rename(self, old_table: TableCatalogEntry, new_name: TableInfo):
2✔
226
        raise Exception("Rename not supported for structured data table")
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc