• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

georgia-tech-db / eva / f3cb8bbb-8165-49fa-af41-b587f634b3c4

pending completion
f3cb8bbb-8165-49fa-af41-b587f634b3c4

Pull #814

circle-ci

jiashenC
update results
Pull Request #814: feat: benchmark question answering v1

42 of 42 new or added lines in 2 files covered. (100.0%)

10095 of 10421 relevant lines covered (96.87%)

0.97 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.88
/eva/storage/sqlite_storage_engine.py
1
# coding=utf-8
2
# Copyright 2018-2023 EVA
3
#
4
# Licensed under the Apache License, Version 2.0 (the "License");
5
# you may not use this file except in compliance with the License.
6
# You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
from typing import Iterator, List
1✔
16

17
import numpy as np
1✔
18
import pandas as pd
1✔
19
from sqlalchemy import Table, inspect
1✔
20
from sqlalchemy.sql.expression import ColumnElement
1✔
21

22
from eva.catalog.catalog_type import ColumnType
1✔
23
from eva.catalog.models.base_model import BaseModel
1✔
24
from eva.catalog.models.column_catalog import ColumnCatalogEntry
1✔
25
from eva.catalog.models.table_catalog import TableCatalogEntry
1✔
26
from eva.catalog.schema_utils import SchemaUtils
1✔
27
from eva.catalog.sql_config import IDENTIFIER_COLUMN, SQLConfig
1✔
28
from eva.models.storage.batch import Batch
1✔
29
from eva.parser.table_ref import TableInfo
1✔
30
from eva.storage.abstract_storage_engine import AbstractStorageEngine
1✔
31
from eva.utils.generic_utils import PickleSerializer, get_size
1✔
32
from eva.utils.logging_manager import logger
1✔
33

34
# Leveraging Dynamic schema in SQLAlchemy
35
# https://sparrigan.github.io/sql/sqla/2016/01/03/dynamic-tables.html
36

37

38
class SQLStorageEngine(AbstractStorageEngine):
1✔
39
    def __init__(self):
1✔
40
        """
41
        Grab the existing sql session
42
        """
43
        self._sql_session = SQLConfig().session
1✔
44
        self._sql_engine = SQLConfig().engine
1✔
45
        self._serializer = PickleSerializer
1✔
46

47
    def _dict_to_sql_row(self, dict_row: dict, columns: List[ColumnCatalogEntry]):
1✔
48
        # Serialize numpy data
49
        for col in columns:
1✔
50
            if col.type == ColumnType.NDARRAY:
1✔
51
                dict_row[col.name] = self._serializer.serialize(dict_row[col.name])
1✔
52
            elif isinstance(dict_row[col.name], (np.generic,)):
1✔
53
                # Sqlalchemy does not consume numpy generic data types
54
                # convert numpy datatype to python generic datatype using tolist()
55
                # eg. np.int64 -> int
56
                # https://stackoverflow.com/a/53067954
57
                dict_row[col.name] = dict_row[col.name].tolist()
1✔
58
        return dict_row
1✔
59

60
    def _sql_row_to_dict(self, sql_row: tuple, columns: List[ColumnCatalogEntry]):
1✔
61
        # Deserialize numpy data
62
        dict_row = {}
1✔
63
        for idx, col in enumerate(columns):
1✔
64
            if col.type == ColumnType.NDARRAY:
1✔
65
                dict_row[col.name] = self._serializer.deserialize(sql_row[idx])
1✔
66
            else:
67
                dict_row[col.name] = sql_row[idx]
1✔
68
        return dict_row
1✔
69

70
    def _try_loading_table_via_reflection(self, table_name: str):
1✔
71
        metadata_obj = BaseModel.metadata
1✔
72
        if table_name in metadata_obj.tables:
1✔
73
            return metadata_obj.tables[table_name]
1✔
74
        # reflection
75
        insp = inspect(self._sql_engine)
1✔
76
        if insp.has_table(table_name):
1✔
77
            table = Table(table_name, metadata_obj)
×
78
            insp.reflect_table(table, None)
×
79
            return table
×
80
        else:
81
            err_msg = f"No table found with name {table_name}"
1✔
82
            logger.exception(err_msg)
83
            raise Exception(err_msg)
84

85
    def create(self, table: TableCatalogEntry, **kwargs):
1✔
86
        """
87
        Create an empty table in sql.
88
        It dynamically constructs schema in sqlaclchemy
89
        to create the table
90
        """
91
        attr_dict = {"__tablename__": table.name}
1✔
92

93
        # During table creation, assume row_id is automatically handled by
94
        # the sqlalchemy engine.
95
        table_columns = [col for col in table.columns if col.name != IDENTIFIER_COLUMN]
1✔
96
        sqlalchemy_schema = SchemaUtils.xform_to_sqlalchemy_schema(table_columns)
1✔
97

98
        attr_dict.update(sqlalchemy_schema)
1✔
99
        # dynamic schema generation
100
        # https://sparrigan.github.io/sql/sqla/2016/01/03/dynamic-tables.html
101
        new_table = type("__placeholder_class_name__", (BaseModel,), attr_dict)()
1✔
102
        table = BaseModel.metadata.tables[table.name]
1✔
103
        if not table.exists():
1✔
104
            BaseModel.metadata.tables[table.name].create(self._sql_engine)
1✔
105
        self._sql_session.commit()
1✔
106
        return new_table
1✔
107

108
    def drop(self, table: TableCatalogEntry):
1✔
109
        try:
1✔
110
            table_to_remove = self._try_loading_table_via_reflection(table.name)
1✔
111
            table_to_remove.drop()
1✔
112
            # In-memory metadata does not automatically sync with the database
113
            # therefore manually removing the table from the in-memory metadata
114
            # https://github.com/sqlalchemy/sqlalchemy/issues/5112
115
            BaseModel.metadata.remove(table_to_remove)
1✔
116
            self._sql_session.commit()
1✔
117
        except Exception as e:
118
            err_msg = f"Failed to drop the table {table.name} with Exception {str(e)}"
119
            logger.exception(err_msg)
120
            raise Exception(err_msg)
121

122
    def write(self, table: TableCatalogEntry, rows: Batch):
1✔
123
        """
124
        Write rows into the sql table.
125

126
        Arguments:
127
            table: table metadata object to write into
128
            rows : batch to be persisted in the storage.
129
        """
130
        try:
1✔
131
            table_to_update = self._try_loading_table_via_reflection(table.name)
1✔
132
            columns = rows.frames.keys()
1✔
133
            data = []
1✔
134

135
            # During table writes, assume row_id is automatically handled by
136
            # the sqlalchemy engine. Another assumption we make here is the
137
            # updated data need not to take care of row_id.
138
            table_columns = [
1✔
139
                col for col in table.columns if col.name != IDENTIFIER_COLUMN
140
            ]
141

142
            # Todo: validate the data type before inserting into the table
143
            for record in rows.frames.values:
1✔
144
                row_data = {col: record[idx] for idx, col in enumerate(columns)}
1✔
145
                data.append(self._dict_to_sql_row(row_data, table_columns))
1✔
146
            self._sql_engine.execute(table_to_update.insert(), data)
1✔
147
            self._sql_session.commit()
1✔
148
        except Exception as e:
149
            err_msg = f"Failed to update the table {table.name} with exception {str(e)}"
150
            logger.exception(err_msg)
151
            raise Exception(err_msg)
152

153
    def read(
1✔
154
        self, table: TableCatalogEntry, batch_mem_size: int = 30000000
155
    ) -> Iterator[Batch]:
156
        """
157
        Reads the table and return a batch iterator for the
158
        tuples.
159

160
        Argument:
161
            table: table metadata object of the table to read
162
            batch_mem_size (int): memory size of the batch read from storage
163
        Return:
164
            Iterator of Batch read.
165
        """
166
        try:
1✔
167
            table_to_read = self._try_loading_table_via_reflection(table.name)
1✔
168
            result = self._sql_engine.execute(table_to_read.select())
1✔
169
            data_batch = []
1✔
170
            row_size = None
1✔
171
            for row in result:
1✔
172
                # Todo: Verify the order of columns in row matches the table.columns
173
                # For table read, we provide row_id so that user can also retrieve
174
                # row_id from the table.
175
                data_batch.append(self._sql_row_to_dict(row, table.columns))
1✔
176
                if row_size is None:
1✔
177
                    row_size = 0
1✔
178
                    row_size = get_size(data_batch)
1✔
179
                if len(data_batch) * row_size >= batch_mem_size:
1✔
180
                    yield Batch(pd.DataFrame(data_batch))
1✔
181
                    data_batch = []
1✔
182
            if data_batch:
1✔
183
                yield Batch(pd.DataFrame(data_batch))
1✔
184

185
        except Exception as e:
186
            err_msg = f"Failed to read the table {table.name} with exception {str(e)}"
187
            logger.exception(err_msg)
188
            raise Exception(err_msg)
189

190
    def delete(
1✔
191
        self, table: TableCatalogEntry, sqlalchemy_filter_clause: ColumnElement[bool]
192
    ):
193
        """Delete tuples from the table where rows satisfy the where_clause.
194
        The current implementation only handles equality predicates.
195

196
        Argument:
197
            table: table metadata object of the table
198
            where_clause: clause used to find the tuples to remove.
199
        """
200
        try:
1✔
201
            table_to_delete_from = self._try_loading_table_via_reflection(table.name)
1✔
202
            d = table_to_delete_from.delete().where(sqlalchemy_filter_clause)
1✔
203
            self._sql_engine.execute(d)
1✔
204
            self._sql_session.commit()
1✔
205
        except Exception as e:
206
            err_msg = (
207
                f"Failed to delete from the table {table.name} with exception {str(e)}"
208
            )
209
            logger.exception(err_msg)
210
            raise Exception(err_msg)
211

212
    def rename(self, old_table: TableCatalogEntry, new_name: TableInfo):
1✔
213
        raise Exception("Rename not supported for structured data table")
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc