• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

georgia-tech-db / eva / #850

08 Nov 2023 08:36PM UTC coverage: 0.0% (-77.0%) from 76.982%
#850

push

circleci

americast
fix metrics logic

0 of 1 new or added line in 1 file covered. (0.0%)

9789 existing lines in 252 files now uncovered.

0 of 12428 relevant lines covered (0.0%)

0.0 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/evadb/executor/create_index_executor.py
1
# coding=utf-8
2
# Copyright 2018-2023 EvaDB
3
#
4
# Licensed under the Apache License, Version 2.0 (the "License");
5
# you may not use this file except in compliance with the License.
6
# You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
UNCOV
15
from pathlib import Path
×
16

UNCOV
17
import pandas as pd
×
18

UNCOV
19
from evadb.catalog.catalog_type import VectorStoreType
×
UNCOV
20
from evadb.catalog.sql_config import ROW_NUM_COLUMN
×
UNCOV
21
from evadb.database import EvaDBDatabase
×
UNCOV
22
from evadb.executor.abstract_executor import AbstractExecutor
×
UNCOV
23
from evadb.executor.executor_utils import ExecutorError, handle_vector_store_params
×
UNCOV
24
from evadb.expression.function_expression import FunctionExpression
×
UNCOV
25
from evadb.models.storage.batch import Batch
×
UNCOV
26
from evadb.plan_nodes.create_index_plan import CreateIndexPlan
×
UNCOV
27
from evadb.third_party.databases.interface import get_database_handler
×
UNCOV
28
from evadb.third_party.vector_stores.types import FeaturePayload
×
UNCOV
29
from evadb.third_party.vector_stores.utils import VectorStoreFactory
×
UNCOV
30
from evadb.utils.logging_manager import logger
×
31

32

UNCOV
33
class CreateIndexExecutor(AbstractExecutor):
×
UNCOV
34
    def __init__(self, db: EvaDBDatabase, node: CreateIndexPlan):
×
UNCOV
35
        super().__init__(db, node)
×
36

UNCOV
37
        self.name = self.node.name
×
UNCOV
38
        self.if_not_exists = self.node.if_not_exists
×
UNCOV
39
        self.table_ref = self.node.table_ref
×
UNCOV
40
        self.col_list = self.node.col_list
×
UNCOV
41
        self.vector_store_type = self.node.vector_store_type
×
UNCOV
42
        self.project_expr_list = self.node.project_expr_list
×
UNCOV
43
        self.index_def = self.node.index_def
×
44

UNCOV
45
    def exec(self, *args, **kwargs):
×
46
        # Vector type specific creation.
UNCOV
47
        if self.vector_store_type == VectorStoreType.PGVECTOR:
×
48
            self._create_native_index()
×
49
        else:
UNCOV
50
            self._create_evadb_index()
×
51

UNCOV
52
        yield Batch(
×
53
            pd.DataFrame([f"Index {self.name} successfully added to the database."])
54
        )
55

56
    # Create index through the native storage engine.
UNCOV
57
    def _create_native_index(self):
×
58
        table = self.table_ref.table
×
59
        db_catalog_entry = self.catalog().get_database_catalog_entry(
×
60
            table.database_name
61
        )
62

63
        with get_database_handler(
×
64
            db_catalog_entry.engine, **db_catalog_entry.params
65
        ) as handler:
66
            # As other libraries, we default to HNSW and L2 distance.
67
            resp = handler.execute_native_query(
×
68
                f"""CREATE INDEX {self.name} ON {table.table_name}
69
                    USING hnsw ({self.col_list[0].name} vector_l2_ops)"""
70
            )
71
            if resp.error is not None:
×
72
                raise ExecutorError(
73
                    f"Native engine create index encounters error: {resp.error}"
74
                )
75

76
    # On-disk saving path for EvaDB index.
UNCOV
77
    def _get_evadb_index_save_path(self) -> Path:
×
UNCOV
78
        index_dir = Path(self.db.catalog().get_configuration_catalog_value("index_dir"))
×
UNCOV
79
        if not index_dir.exists():
×
80
            index_dir.mkdir(parents=True, exist_ok=True)
×
UNCOV
81
        return str(
×
82
            index_dir / Path("{}_{}.index".format(self.vector_store_type, self.name))
83
        )
84

85
    # Create EvaDB index.
UNCOV
86
    def _create_evadb_index(self):
×
87
        # Find function expression.
UNCOV
88
        function_expression, function_expression_signature = None, None
×
UNCOV
89
        for project_expr in self.project_expr_list:
×
UNCOV
90
            if isinstance(project_expr, FunctionExpression):
×
91
                function_expression = project_expr
×
92
                function_expression_signature = project_expr.signature()
×
93

94
        # Get feature tables.
UNCOV
95
        feat_tb_catalog_entry = self.table_ref.table.table_obj
×
96

97
        # Get feature column.
UNCOV
98
        feat_col_name = self.col_list[0].name
×
UNCOV
99
        feat_col_catalog_entry = [
×
100
            col for col in feat_tb_catalog_entry.columns if col.name == feat_col_name
101
        ][0]
102

UNCOV
103
        if function_expression is not None:
×
104
            feat_col_name = function_expression.output_objs[0].name
×
105

UNCOV
106
        index_catalog_entry = self.catalog().get_index_catalog_entry_by_name(self.name)
×
UNCOV
107
        index_path = self._get_evadb_index_save_path()
×
108

UNCOV
109
        if index_catalog_entry is not None:
×
110
            msg = f"Index {self.name} already exists."
×
111
            if self.if_not_exists:
×
112
                if (
×
113
                    index_catalog_entry.feat_column == feat_col_catalog_entry
114
                    and index_catalog_entry.function_signature
115
                    == function_expression_signature
116
                    and index_catalog_entry.type == self.node.vector_store_type
117
                ):
118
                    # Only update index if everything matches.
119
                    logger.warn(msg + " It will be updated on existing table.")
×
120
                    index = VectorStoreFactory.init_vector_store(
×
121
                        self.vector_store_type,
122
                        self.name,
123
                        **handle_vector_store_params(
124
                            self.vector_store_type, index_path, self.catalog
125
                        ),
126
                    )
127
                else:
128
                    # Skip index update if CREATE INDEX runs on a different index.
129
                    logger.warn(msg)
×
130
                    return
×
131
            else:
132
                logger.error(msg)
×
133
                raise ExecutorError(msg)
134
        else:
UNCOV
135
            index = None
×
136

UNCOV
137
        try:
×
138
            # Add features to index.
UNCOV
139
            for input_batch in self.children[0].exec():
×
UNCOV
140
                input_batch.drop_column_alias()
×
UNCOV
141
                feat = input_batch.column_as_numpy_array(feat_col_name)
×
UNCOV
142
                row_num = input_batch.column_as_numpy_array(ROW_NUM_COLUMN)
×
143

UNCOV
144
                for i in range(len(input_batch)):
×
UNCOV
145
                    row_feat = feat[i].reshape(1, -1)
×
146

147
                    # Create new index if not exists.
UNCOV
148
                    if index is None:
×
UNCOV
149
                        input_dim = row_feat.shape[1]
×
UNCOV
150
                        index = VectorStoreFactory.init_vector_store(
×
151
                            self.vector_store_type,
152
                            self.name,
153
                            **handle_vector_store_params(
154
                                self.vector_store_type, index_path, self.catalog
155
                            ),
156
                        )
UNCOV
157
                        index.create(input_dim)
×
158

159
                    # Row ID for mapping back to the row.
UNCOV
160
                    index.add([FeaturePayload(row_num[i], row_feat)])
×
161

162
            # Persist index.
UNCOV
163
            index.persist()
×
164

165
            # Save to catalog.
UNCOV
166
            if index_catalog_entry is None:
×
UNCOV
167
                self.catalog().insert_index_catalog_entry(
×
168
                    self.name,
169
                    index_path,
170
                    self.vector_store_type,
171
                    feat_col_catalog_entry,
172
                    function_expression_signature,
173
                    self.index_def,
174
                )
175
        except Exception as e:
176
            # Delete index.
177
            if index:
178
                index.delete()
179

180
            # Throw exception back to user.
181
            raise ExecutorError(str(e))
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc