• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

georgia-tech-db / eva / #820

01 Oct 2023 02:22AM UTC coverage: 0.0% (-73.7%) from 73.748%
#820

push

circle-ci

Jiashen Cao
fix lint error

0 of 12361 relevant lines covered (0.0%)

0.0 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/evadb/executor/create_index_executor.py
1
# coding=utf-8
2
# Copyright 2018-2023 EvaDB
3
#
4
# Licensed under the Apache License, Version 2.0 (the "License");
5
# you may not use this file except in compliance with the License.
6
# You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
from pathlib import Path
×
16

17
import pandas as pd
×
18

19
from evadb.catalog.catalog_type import VectorStoreType
×
20
from evadb.catalog.sql_config import ROW_NUM_COLUMN
×
21
from evadb.database import EvaDBDatabase
×
22
from evadb.executor.abstract_executor import AbstractExecutor
×
23
from evadb.executor.executor_utils import ExecutorError, handle_vector_store_params
×
24
from evadb.expression.function_expression import FunctionExpression
×
25
from evadb.models.storage.batch import Batch
×
26
from evadb.plan_nodes.create_index_plan import CreateIndexPlan
×
27
from evadb.third_party.databases.interface import get_database_handler
×
28
from evadb.third_party.vector_stores.types import FeaturePayload
×
29
from evadb.third_party.vector_stores.utils import VectorStoreFactory
×
30
from evadb.utils.logging_manager import logger
×
31

32

33
class CreateIndexExecutor(AbstractExecutor):
×
34
    def __init__(self, db: EvaDBDatabase, node: CreateIndexPlan):
×
35
        super().__init__(db, node)
×
36

37
    def exec(self, *args, **kwargs):
×
38
        # Vector type specific creation.
39
        if self.node.vector_store_type == VectorStoreType.PGVECTOR:
×
40
            self._create_native_index()
×
41
        else:
42
            self._create_evadb_index()
×
43

44
        yield Batch(
×
45
            pd.DataFrame(
46
                [f"Index {self.node.name} successfully added to the database."]
47
            )
48
        )
49

50
    # Create index through the native storage engine.
51
    def _create_native_index(self):
×
52
        table = self.node.table_ref.table
×
53
        db_catalog_entry = self.catalog().get_database_catalog_entry(
×
54
            table.database_name
55
        )
56

57
        with get_database_handler(
×
58
            db_catalog_entry.engine, **db_catalog_entry.params
59
        ) as handler:
60
            # As other libraries, we default to HNSW and L2 distance.
61
            resp = handler.execute_native_query(
×
62
                f"""CREATE INDEX {self.node.name} ON {table.table_name}
63
                    USING hnsw ({self.node.col_list[0].name} vector_l2_ops)"""
64
            )
65
            if resp.error is not None:
×
66
                raise ExecutorError(
67
                    f"Native engine create index encounters error: {resp.error}"
68
                )
69

70
    # On-disk saving path for EvaDB index.
71
    def _get_evadb_index_save_path(self) -> Path:
×
72
        index_dir = Path(self.config.get_value("storage", "index_dir"))
×
73
        if not index_dir.exists():
×
74
            index_dir.mkdir(parents=True, exist_ok=True)
×
75
        return str(
×
76
            index_dir
77
            / Path("{}_{}.index".format(self.node.vector_store_type, self.node.name))
78
        )
79

80
    # Create EvaDB index.
81
    def _create_evadb_index(self):
×
82
        if self.catalog().get_index_catalog_entry_by_name(self.node.name):
×
83
            msg = f"Index {self.node.name} already exists."
×
84
            if self.node.if_not_exists:
×
85
                logger.warn(msg)
×
86
                return
×
87
            else:
88
                logger.error(msg)
×
89
                raise ExecutorError(msg)
90

91
        index = None
×
92
        index_path = self._get_evadb_index_save_path()
×
93

94
        try:
×
95
            # Get feature tables.
96
            feat_catalog_entry = self.node.table_ref.table.table_obj
×
97

98
            # Get feature column.
99
            feat_col_name = self.node.col_list[0].name
×
100
            feat_column = [
×
101
                col for col in feat_catalog_entry.columns if col.name == feat_col_name
102
            ][0]
103

104
            # Find function expression.
105
            function_expression = None
×
106
            for project_expr in self.node.project_expr_list:
×
107
                if isinstance(project_expr, FunctionExpression):
×
108
                    function_expression = project_expr
×
109

110
            if function_expression is not None:
×
111
                feat_col_name = function_expression.output_objs[0].name
×
112

113
            # Add features to index.
114
            # TODO: batch size is hardcoded for now.
115
            input_dim = -1
×
116
            for input_batch in self.children[0].exec():
×
117
                input_batch.drop_column_alias()
×
118
                feat = input_batch.column_as_numpy_array(feat_col_name)
×
119
                row_num = input_batch.column_as_numpy_array(ROW_NUM_COLUMN)
×
120

121
                for i in range(len(input_batch)):
×
122
                    row_feat = feat[i].reshape(1, -1)
×
123
                    if index is None:
×
124
                        input_dim = row_feat.shape[1]
×
125
                        index = VectorStoreFactory.init_vector_store(
×
126
                            self.node.vector_store_type,
127
                            self.node.name,
128
                            **handle_vector_store_params(
129
                                self.node.vector_store_type, index_path
130
                            ),
131
                        )
132
                        index.create(input_dim)
×
133

134
                    # Row ID for mapping back to the row.
135
                    index.add([FeaturePayload(row_num[i], row_feat)])
×
136

137
            # Persist index.
138
            index.persist()
×
139

140
            # Save to catalog.
141
            self.catalog().insert_index_catalog_entry(
×
142
                self.node.name,
143
                index_path,
144
                self.node.vector_store_type,
145
                feat_column,
146
                function_expression.signature()
147
                if function_expression is not None
148
                else None,
149
            )
150
        except Exception as e:
151
            # Delete index.
152
            if index:
153
                index.delete()
154

155
            # Throw exception back to user.
156
            raise ExecutorError(str(e))
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc