• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

georgia-tech-db / eva / d65c6213-51c3-43ff-bbf7-2f70c812864d

pending completion
d65c6213-51c3-43ff-bbf7-2f70c812864d

Pull #551

circle-ci

jarulraj
fixes
Pull Request #551: feat: add support for aggregates and toxicity classification

88 of 88 new or added lines in 10 files covered. (100.0%)

8210 of 8736 relevant lines covered (93.98%)

0.91 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.0
/eva/executor/create_index_executor.py
1
# coding=utf-8
2
# Copyright 2018-2022 EVA
3
#
4
# Licensed under the Apache License, Version 2.0 (the "License");
5
# you may not use this file except in compliance with the License.
6
# You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
import os
1✔
16
from pathlib import Path
1✔
17

18
import faiss
1✔
19
import pandas as pd
1✔
20

21
from eva.catalog.catalog_manager import CatalogManager
1✔
22
from eva.catalog.catalog_type import ColumnType, IndexType, TableType
1✔
23
from eva.catalog.sql_config import IDENTIFIER_COLUMN
1✔
24
from eva.configuration.constants import EVA_DEFAULT_DIR, INDEX_DIR
1✔
25
from eva.executor.abstract_executor import AbstractExecutor
1✔
26
from eva.executor.executor_utils import ExecutorError
1✔
27
from eva.models.storage.batch import Batch
1✔
28
from eva.parser.create_statement import ColConstraintInfo, ColumnDefinition
1✔
29
from eva.plan_nodes.create_index_plan import CreateIndexPlan
1✔
30
from eva.storage.storage_engine import StorageEngine
1✔
31
from eva.utils.logging_manager import logger
1✔
32

33

34
class CreateIndexExecutor(AbstractExecutor):
1✔
35
    def __init__(self, node: CreateIndexPlan):
1✔
36
        super().__init__(node)
1✔
37

38
    def validate(self):
1✔
39
        pass
×
40

41
    def exec(self):
1✔
42
        catalog_manager = CatalogManager()
1✔
43
        if catalog_manager.get_index_catalog_entry_by_name(self.node.name):
1✔
44
            msg = f"Index {self.node.name} already exists."
×
45
            logger.error(msg)
×
46
            raise ExecutorError(msg)
×
47

48
        try:
1✔
49
            # Get feature tables.
50
            feat_catalog_entry = self.node.table_ref.table.table_obj
1✔
51

52
            # Get feature column.
53
            feat_col_name = self.node.col_list[0].name
1✔
54
            feat_column = [
1✔
55
                col for col in feat_catalog_entry.columns if col.name == feat_col_name
56
            ][0]
57

58
            # Add features to index.
59
            # TODO: batch size is hardcoded for now.
60
            index = None
1✔
61
            input_dim, num_rows = -1, 0
1✔
62
            logical_to_row_id = dict()
1✔
63
            storage_engine = StorageEngine.factory(feat_catalog_entry)
1✔
64
            for batch in storage_engine.read(feat_catalog_entry, 1):
1✔
65
                # Pandas wraps numpy array as an object inside a numpy
66
                # array. Use zero index to get the actual numpy array.
67
                feat = batch.column_as_numpy_array(feat_col_name)[0]
1✔
68
                if index is None:
1✔
69
                    input_dim = feat.shape[-1]
1✔
70
                    index = self._create_index(self.node.index_type, input_dim)
1✔
71
                index.add(feat)
1✔
72

73
                # Secondary mapping.
74
                row_id = batch.column_as_numpy_array(IDENTIFIER_COLUMN)[0]
1✔
75
                logical_to_row_id[num_rows] = row_id
1✔
76

77
                num_rows += 1
1✔
78

79
            # Build secondary index maaping.
80
            secondary_index_catalog_entry = self._create_secondary_index_table()
1✔
81
            storage_engine = StorageEngine.factory(secondary_index_catalog_entry)
1✔
82
            storage_engine.create(secondary_index_catalog_entry)
1✔
83

84
            # Write mapping.
85
            logical_id, row_id = list(logical_to_row_id.keys()), list(
1✔
86
                logical_to_row_id.values()
87
            )
88
            secondary_index = Batch(
1✔
89
                pd.DataFrame(data={"logical_id": logical_id, "row_id": row_id})
90
            )
91
            storage_engine.write(secondary_index_catalog_entry, secondary_index)
1✔
92

93
            # Persist index.
94
            faiss.write_index(index, self._get_index_save_path())
1✔
95

96
            # Save to catalog.
97
            catalog_manager.insert_index_catalog_entry(
1✔
98
                self.node.name,
99
                self._get_index_save_path(),
100
                self.node.index_type,
101
                secondary_index_catalog_entry,
102
                feat_column,
103
            )
104

105
            yield Batch(
1✔
106
                pd.DataFrame(
107
                    [f"Index {self.node.name} successfully added to the database."]
108
                )
109
            )
110
        except Exception as e:
1✔
111
            # Roll back in reverse order.
112
            # Delete on-disk index.
113
            if os.path.exists(self._get_index_save_path()):
1✔
114
                os.remove(self._get_index_save_path())
×
115

116
            # Drop secondary index table.
117
            secondary_index_tb_name = "secondary_index_{}_{}".format(
1✔
118
                self.node.index_type, self.node.name
119
            )
120
            if catalog_manager.check_table_exists(
1✔
121
                secondary_index_tb_name,
122
            ):
123
                secondary_index_catalog_entry = catalog_manager.get_table_catalog_entry(
1✔
124
                    secondary_index_tb_name
125
                )
126
                storage_engine = StorageEngine.factory(secondary_index_catalog_entry)
1✔
127
                storage_engine.drop(secondary_index_catalog_entry)
1✔
128
                catalog_manager.delete_table_catalog_entry(
1✔
129
                    secondary_index_catalog_entry
130
                )
131

132
            # Throw exception back to user.
133
            raise ExecutorError(str(e))
1✔
134

135
    def _get_index_save_path(self):
1✔
136
        return str(
1✔
137
            EVA_DEFAULT_DIR
138
            / INDEX_DIR
139
            / Path("{}_{}.index".format(self.node.index_type, self.node.name))
140
        )
141

142
    # Comment out since Index IO is not needed for now.
143
    # def _get_index_io_list(self, input_dim):
144
    #     # Input dimension is inferred from the actual feature.
145
    #     catalog_manager = CatalogManager()
146
    #     input_index_io = catalog_manager.index_io(
147
    #         "input_feature",
148
    #         ColumnType.NDARRAY,
149
    #         NdArrayType.FLOAT32,
150
    #         [Dimension.ANYDIM, input_dim],
151
    #         True,
152
    #     )
153

154
    #     # Output dimension depends on number of searched
155
    #     # feature vectors and top N similar feature vectors.
156
    #     # IndexIO has detailed documentation about input and
157
    #     # output format of index.
158
    #     id_index_io = catalog_manager.index_io(
159
    #         "logical_id",
160
    #         ColumnType.NDARRAY,
161
    #         NdArrayType.INT64,
162
    #         [Dimension.ANYDIM, Dimension.ANYDIM],
163
    #         False,
164
    #     )
165
    #     distance_index_io = catalog_manager.index_io(
166
    #         "distance",
167
    #         ColumnType.NDARRAY,
168
    #         NdArrayType.FLOAT32,
169
    #         [Dimension.ANYDIM, Dimension.ANYDIM],
170
    #         False,
171
    #     )
172

173
    #     return [input_index_io, id_index_io, distance_index_io]
174

175
    def _create_secondary_index_table(self):
1✔
176
        # Build secondary index to map from index Logical ID to Row ID.
177
        # Use save_file_path's hash value to retrieve the table.
178
        catalog_manager = CatalogManager()
1✔
179
        col_list = [
1✔
180
            ColumnDefinition(
181
                "logical_id",
182
                ColumnType.INTEGER,
183
                None,
184
                [],
185
                ColConstraintInfo(unique=True),
186
            ),
187
            ColumnDefinition(
188
                "row_id",
189
                ColumnType.INTEGER,
190
                None,
191
                [],
192
                ColConstraintInfo(unique=True),
193
            ),
194
        ]
195
        col_catalog_entries = (
1✔
196
            catalog_manager.xform_column_definitions_to_catalog_entries(col_list)
197
        )
198

199
        catalog_manager = CatalogManager()
1✔
200
        table_catalog_entry = catalog_manager.insert_table_catalog_entry(
1✔
201
            "secondary_index_{}_{}".format(self.node.index_type, self.node.name),
202
            str(
203
                EVA_DEFAULT_DIR
204
                / INDEX_DIR
205
                / Path("{}_{}.secindex".format(self.node.index_type, self.node.name))
206
            ),
207
            col_catalog_entries,
208
            identifier_column="logical_id",
209
            table_type=TableType.STRUCTURED_DATA,
210
        )
211
        return table_catalog_entry
1✔
212

213
    def _create_index(self, index_type: IndexType, input_dim: int):
1✔
214
        if index_type == IndexType.HNSW:
1✔
215
            return faiss.IndexHNSWFlat(input_dim, 32)
1✔
216
        else:
217
            raise ExecutorError(f"Index Type {index_type} is not supported.")
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc