• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

georgia-tech-db / eva / b85b58ac-76a4-45e2-aa21-70891b3ac726

20 Sep 2023 04:52PM UTC coverage: 80.039% (+10.7%) from 69.369%
b85b58ac-76a4-45e2-aa21-70891b3ac726

push

circle-ci

jiashenC
trigger build

9543 of 11923 relevant lines covered (80.04%)

1.46 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.36
/evadb/executor/create_index_executor.py
1
# coding=utf-8
2
# Copyright 2018-2023 EvaDB
3
#
4
# Licensed under the Apache License, Version 2.0 (the "License");
5
# you may not use this file except in compliance with the License.
6
# You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
from pathlib import Path
2✔
16

17
import pandas as pd
2✔
18

19
from evadb.catalog.sql_config import ROW_NUM_COLUMN
2✔
20
from evadb.database import EvaDBDatabase
2✔
21
from evadb.executor.abstract_executor import AbstractExecutor
2✔
22
from evadb.executor.executor_utils import ExecutorError, handle_vector_store_params
2✔
23
from evadb.models.storage.batch import Batch
2✔
24
from evadb.plan_nodes.create_index_plan import CreateIndexPlan
2✔
25
from evadb.storage.storage_engine import StorageEngine
2✔
26
from evadb.third_party.vector_stores.types import FeaturePayload
2✔
27
from evadb.third_party.vector_stores.utils import VectorStoreFactory
2✔
28
from evadb.utils.logging_manager import logger
2✔
29

30

31
class CreateIndexExecutor(AbstractExecutor):
2✔
32
    def __init__(self, db: EvaDBDatabase, node: CreateIndexPlan):
2✔
33
        super().__init__(db, node)
2✔
34

35
    def exec(self, *args, **kwargs):
2✔
36
        if self.catalog().get_index_catalog_entry_by_name(self.node.name):
1✔
37
            msg = f"Index {self.node.name} already exists."
×
38
            if self.node.if_not_exists:
×
39
                logger.warn(msg)
×
40
                return
×
41
            else:
42
                logger.error(msg)
×
43
                raise ExecutorError(msg)
44

45
        self.index_path = self._get_index_save_path()
1✔
46
        self.index = None
1✔
47
        self._create_index()
1✔
48

49
        yield Batch(
1✔
50
            pd.DataFrame(
51
                [f"Index {self.node.name} successfully added to the database."]
52
            )
53
        )
54

55
    def _get_index_save_path(self) -> Path:
2✔
56
        index_dir = Path(self.config.get_value("storage", "index_dir"))
1✔
57
        if not index_dir.exists():
1✔
58
            index_dir.mkdir(parents=True, exist_ok=True)
×
59
        return str(
1✔
60
            index_dir
61
            / Path("{}_{}.index".format(self.node.vector_store_type, self.node.name))
62
        )
63

64
    def _create_index(self):
2✔
65
        try:
1✔
66
            # Get feature tables.
67
            feat_catalog_entry = self.node.table_ref.table.table_obj
1✔
68

69
            # Get feature column.
70
            feat_col_name = self.node.col_list[0].name
1✔
71
            feat_column = [
1✔
72
                col for col in feat_catalog_entry.columns if col.name == feat_col_name
73
            ][0]
74

75
            # Add features to index.
76
            # TODO: batch size is hardcoded for now.
77
            input_dim = -1
1✔
78
            storage_engine = StorageEngine.factory(self.db, feat_catalog_entry)
1✔
79
            for input_batch in storage_engine.read(feat_catalog_entry):
1✔
80
                if self.node.function:
1✔
81
                    # Create index through function expression.
82
                    # Function(input column) -> 2 dimension feature vector.
83
                    input_batch.modify_column_alias(feat_catalog_entry.name.lower())
×
84
                    feat_batch = self.node.function.evaluate(input_batch)
×
85
                    feat_batch.drop_column_alias()
×
86
                    input_batch.drop_column_alias()
×
87
                    feat = feat_batch.column_as_numpy_array("features")
×
88
                else:
89
                    # Create index on the feature table directly.
90
                    # Pandas wraps numpy array as an object inside a numpy
91
                    # array. Use zero index to get the actual numpy array.
92
                    feat = input_batch.column_as_numpy_array(feat_col_name)
1✔
93

94
                row_num = input_batch.column_as_numpy_array(ROW_NUM_COLUMN)
1✔
95

96
                for i in range(len(input_batch)):
1✔
97
                    row_feat = feat[i].reshape(1, -1)
1✔
98
                    if self.index is None:
1✔
99
                        input_dim = row_feat.shape[1]
1✔
100
                        self.index = VectorStoreFactory.init_vector_store(
1✔
101
                            self.node.vector_store_type,
102
                            self.node.name,
103
                            **handle_vector_store_params(
104
                                self.node.vector_store_type, self.index_path
105
                            ),
106
                        )
107
                        self.index.create(input_dim)
1✔
108

109
                    # Row ID for mapping back to the row.
110
                    self.index.add([FeaturePayload(row_num[i], row_feat)])
1✔
111

112
            # Persist index.
113
            self.index.persist()
1✔
114

115
            # Save to catalog.
116
            self.catalog().insert_index_catalog_entry(
1✔
117
                self.node.name,
118
                self.index_path,
119
                self.node.vector_store_type,
120
                feat_column,
121
                self.node.function.signature() if self.node.function else None,
122
            )
123
        except Exception as e:
124
            # Delete index.
125
            if self.index:
126
                self.index.delete()
127

128
            # Throw exception back to user.
129
            raise ExecutorError(str(e))
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc