• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

georgia-tech-db / eva / #725

pending completion
#725

push

circle-ci

web-flow
chore: reducing coverage loss (#619)

* adding delete operation

* Adding Insert Statement

* checkpoint

* supporting multiple entries

* implemented for structured data error

* adding parser visitor for delete

* delete executor

* delete plan and rules

* adding delete to plan executor

* change position of LogicalDelete

* logical delimeter

* delete test case

* adding test case

* adding test case

* adding delete testcase

* adding predicate to delete executor

* adding delete to Image storage

* bug fix in delete

* fixing testcase

* adding test case for insert statement

* remove order_by from statement_binder.py

* better variable names, using Batch

* error message for insert

* removing order_by and limit from delete

* remove order_by and limit

* use f-string

* adding to changelog

* removing commit messages

* formatting

* fixing comments

* formatting

* eva insert f32 values

* fix: should delete range

* delete multiple rows

* udf bootstrap

* try to run tests in parallel

* minor fix for ray to work

* ray fixes

---------

Co-authored-by: Aryan-Rajoria <aryanrajoria1003@gmail.com>
Co-authored-by: Gaurav <gaurav21776@gmail.com>

236 of 236 new or added lines in 53 files covered. (100.0%)

7003 of 9005 relevant lines covered (77.77%)

0.78 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

34.38
/eva/executor/create_index_executor.py
1
# coding=utf-8
2
# Copyright 2018-2022 EVA
3
#
4
# Licensed under the Apache License, Version 2.0 (the "License");
5
# you may not use this file except in compliance with the License.
6
# You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
import os
1✔
16
from pathlib import Path
1✔
17

18
import faiss
1✔
19
import numpy as np
1✔
20
import pandas as pd
1✔
21

22
from eva.catalog.catalog_manager import CatalogManager
1✔
23
from eva.catalog.catalog_type import IndexType
1✔
24
from eva.catalog.sql_config import IDENTIFIER_COLUMN
1✔
25
from eva.configuration.constants import EVA_DEFAULT_DIR, INDEX_DIR
1✔
26
from eva.executor.abstract_executor import AbstractExecutor
1✔
27
from eva.executor.executor_utils import ExecutorError
1✔
28
from eva.models.storage.batch import Batch
1✔
29
from eva.plan_nodes.create_index_plan import CreateIndexPlan
1✔
30
from eva.storage.storage_engine import StorageEngine
1✔
31
from eva.utils.logging_manager import logger
1✔
32

33

34
def create_faiss_index(index_type: IndexType, input_dim: int):
1✔
35
    # Refernce to Faiss documentation.
36
    # IDMap: https://github.com/facebookresearch/faiss/wiki/Pre--and-post-processing#faiss-id-mapping
37
    # Other index types: https://github.com/facebookresearch/faiss/wiki/The-index-factory
38

39
    if index_type == IndexType.HNSW:
×
40
        # HSNW is the actual index. Faiss also provides
41
        # a secondary mapping (IDMap) to map from ID inside index to
42
        # our given ID.
43
        return faiss.IndexIDMap2(faiss.IndexHNSWFlat(input_dim, 32))
×
44

45

46
class CreateIndexExecutor(AbstractExecutor):
1✔
47
    def __init__(self, node: CreateIndexPlan):
1✔
48
        super().__init__(node)
1✔
49

50
    def exec(self, *args, **kwargs):
1✔
51
        catalog_manager = CatalogManager()
×
52
        if catalog_manager.get_index_catalog_entry_by_name(self.node.name):
×
53
            msg = f"Index {self.node.name} already exists."
×
54
            logger.error(msg)
×
55
            raise ExecutorError(msg)
×
56

57
        # Get the index type.
58
        index_type = self.node.index_type
×
59

60
        assert IndexType.is_faiss_index_type(
×
61
            index_type
62
        ), "Index type {} is not supported.".format(index_type)
63

64
        if IndexType.is_faiss_index_type(index_type):
×
65
            self._create_faiss_index()
×
66

67
        yield Batch(
×
68
            pd.DataFrame(
69
                [f"Index {self.node.name} successfully added to the database."]
70
            )
71
        )
72

73
    def _get_index_save_path(self):
1✔
74
        return str(
×
75
            EVA_DEFAULT_DIR
76
            / INDEX_DIR
77
            / Path("{}_{}.index".format(self.node.index_type, self.node.name))
78
        )
79

80
    def _create_faiss_index(self):
1✔
81
        try:
×
82
            catalog_manager = CatalogManager()
×
83

84
            # Get feature tables.
85
            feat_catalog_entry = self.node.table_ref.table.table_obj
×
86

87
            # Get feature column.
88
            feat_col_name = self.node.col_list[0].name
×
89
            feat_column = [
×
90
                col for col in feat_catalog_entry.columns if col.name == feat_col_name
91
            ][0]
92

93
            # Get udf function.
94
            udf_func = self.node.udf_func
×
95

96
            # Add features to index.
97
            # TODO: batch size is hardcoded for now.
98
            index = None
×
99
            input_dim = -1
×
100
            storage_engine = StorageEngine.factory(feat_catalog_entry)
×
101
            for input_batch in storage_engine.read(feat_catalog_entry, 1):
×
102
                if udf_func:
×
103
                    # Create index through UDF expression.
104
                    # UDF(input column) -> 2 dimension feature vector.
105
                    input_batch.modify_column_alias(feat_catalog_entry.name.lower())
×
106
                    feat_batch = self.node.udf_func.evaluate(input_batch)
×
107
                    feat_batch.drop_column_alias()
×
108
                    input_batch.drop_column_alias()
×
109
                    feat = feat_batch.column_as_numpy_array("features")[0]
×
110
                else:
111
                    # Create index on the feature table direclty.
112
                    # Pandas wraps numpy array as an object inside a numpy
113
                    # array. Use zero index to get the actual numpy array.
114
                    feat = input_batch.column_as_numpy_array(feat_col_name)[0]
×
115

116
                # Transform to 2-D.
117
                feat = feat.reshape(1, -1)
×
118

119
                if index is None:
×
120
                    input_dim = feat.shape[-1]
×
121
                    index = create_faiss_index(self.node.index_type, input_dim)
×
122

123
                # Row ID for mapping back to the row.
124
                row_id = input_batch.column_as_numpy_array(IDENTIFIER_COLUMN)[0]
×
125
                index.add_with_ids(feat, np.array([row_id]))
×
126

127
            # Persist index.
128
            faiss.write_index(index, self._get_index_save_path())
×
129

130
            # Save to catalog.
131
            catalog_manager.insert_index_catalog_entry(
×
132
                self.node.name,
133
                self._get_index_save_path(),
134
                self.node.index_type,
135
                feat_column,
136
                udf_func.signature() if udf_func else None,
137
            )
138
        except Exception as e:
×
139
            # Roll back in reverse order.
140
            # Delete on-disk index.
141
            if os.path.exists(self._get_index_save_path()):
×
142
                os.remove(self._get_index_save_path())
×
143

144
            # Throw exception back to user.
145
            raise ExecutorError(str(e))
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc