• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

georgia-tech-db / eva / 20a9a0f9-edcc-437c-815d-bcc1a2d22b17

10 Nov 2023 04:50AM UTC coverage: 66.644% (-10.2%) from 76.812%
20a9a0f9-edcc-437c-815d-bcc1a2d22b17

push

circleci

americast
update docs

0 of 1 new or added line in 1 file covered. (0.0%)

1354 existing lines in 113 files now uncovered.

8767 of 13155 relevant lines covered (66.64%)

0.67 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

26.25
/evadb/executor/vector_index_scan_executor.py
1
# coding=utf-8
2
# Copyright 2018-2023 EvaDB
3
#
4
# Licensed under the Apache License, Version 2.0 (the "License");
5
# you may not use this file except in compliance with the License.
6
# You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
from typing import Iterator
1✔
16

17
import pandas as pd
1✔
18

19
from evadb.catalog.models.utils import VectorStoreType
1✔
20
from evadb.catalog.sql_config import ROW_NUM_COLUMN
1✔
21
from evadb.database import EvaDBDatabase
1✔
22
from evadb.executor.abstract_executor import AbstractExecutor
1✔
23
from evadb.executor.executor_utils import ExecutorError, handle_vector_store_params
1✔
24
from evadb.models.storage.batch import Batch
1✔
25
from evadb.plan_nodes.storage_plan import StoragePlan
1✔
26
from evadb.plan_nodes.vector_index_scan_plan import VectorIndexScanPlan
1✔
27
from evadb.third_party.databases.interface import get_database_handler
1✔
28
from evadb.third_party.vector_stores.types import VectorIndexQuery
1✔
29
from evadb.third_party.vector_stores.utils import VectorStoreFactory
1✔
30
from evadb.utils.logging_manager import logger
1✔
31

32

33
# Helper function for getting row_num column alias.
34
def get_row_num_column_alias(column_list):
1✔
35
    for column in column_list:
×
36
        alias, col_name = column.split(".")
×
37
        if col_name == ROW_NUM_COLUMN:
×
38
            return alias
×
39

40

41
class VectorIndexScanExecutor(AbstractExecutor):
1✔
42
    def __init__(self, db: EvaDBDatabase, node: VectorIndexScanPlan):
1✔
UNCOV
43
        super().__init__(db, node)
×
44

UNCOV
45
        self.index_name = node.index.name
×
UNCOV
46
        self.vector_store_type = node.index.type
×
UNCOV
47
        self.feat_column = node.index.feat_column
×
UNCOV
48
        self.limit_count = node.limit_count
×
UNCOV
49
        self.search_query_expr = node.search_query_expr
×
50

51
    def exec(self, *args, **kwargs) -> Iterator[Batch]:
1✔
52
        if self.vector_store_type == VectorStoreType.PGVECTOR:
×
53
            return self._native_vector_index_scan()
×
54
        else:
55
            return self._evadb_vector_index_scan(*args, **kwargs)
×
56

57
    def _get_search_query_results(self):
1✔
58
        # Get the query feature vector. Create a dummy
59
        # batch to retreat a single file path.
60
        dummy_batch = Batch(
×
61
            frames=pd.DataFrame(
62
                {"0": [0]},
63
            )
64
        )
65
        search_batch = self.search_query_expr.evaluate(dummy_batch)
×
66

67
        # Scan index. The search batch comes from the Open call.
68
        feature_col_name = self.search_query_expr.output_objs[0].name
×
69
        search_batch.drop_column_alias()
×
70
        search_feat = search_batch.column_as_numpy_array(feature_col_name)[0]
×
71
        search_feat = search_feat.reshape(1, -1)
×
72
        return search_feat
×
73

74
    def _native_vector_index_scan(self):
1✔
75
        search_feat = self._get_search_query_results()
×
76
        search_feat = search_feat.reshape(-1).tolist()
×
77

78
        tb_catalog_entry = list(self.node.find_all(StoragePlan))[0].table
×
79
        db_catalog_entry = self.db.catalog().get_database_catalog_entry(
×
80
            tb_catalog_entry.database_name
81
        )
82
        with get_database_handler(
×
83
            db_catalog_entry.engine, **db_catalog_entry.params
84
        ) as handler:
85
            resp = handler.execute_native_query(
×
86
                f"""SELECT * FROM {tb_catalog_entry.name}
87
                                                ORDER BY {self.feat_column.name} <-> '{search_feat}'
88
                                                LIMIT {self.limit_count}"""
89
            )
90
            if resp.error is not None:
×
91
                raise ExecutorError(f"Native index can encounters {resp.error}")
92
            res = Batch(frames=resp.data)
×
93
            res.modify_column_alias(tb_catalog_entry.name)
×
94
            yield res
×
95

96
    def _evadb_vector_index_scan(self, *args, **kwargs):
1✔
97
        # Fetch the index from disk.
98
        index_catalog_entry = self.catalog().get_index_catalog_entry_by_name(
×
99
            self.index_name
100
        )
101
        self.index_path = index_catalog_entry.save_file_path
×
102
        self.index = VectorStoreFactory.init_vector_store(
×
103
            self.vector_store_type,
104
            self.index_name,
105
            **handle_vector_store_params(
106
                self.vector_store_type, self.index_path, self.db.catalog
107
            ),
108
        )
109

110
        search_feat = self._get_search_query_results()
×
111
        index_result = self.index.query(
×
112
            VectorIndexQuery(search_feat, self.limit_count.value)
113
        )
114
        # todo support queries over distance as well
115
        # distance_list = index_result.similarities
116
        row_num_np = index_result.ids
×
117
        # Load projected columns from disk and join with search results.
118
        row_num_col_name = None
×
119

120
        # handle the case where the index_results are less than self.limit_count.value
121
        num_required_results = self.limit_count.value
×
122
        if len(index_result.ids) < self.limit_count.value:
×
123
            num_required_results = len(index_result.ids)
×
124
            logger.warning(
×
125
                f"The index {self.index_name} returned only {num_required_results} results, which is fewer than the required {self.limit_count.value}."
126
            )
127

128
        final_df = pd.DataFrame()
×
129
        res_data_list = []
×
130
        row_num_df = pd.DataFrame({"row_num_np": row_num_np})
×
131
        for batch in self.children[0].exec(**kwargs):
×
132
            if not row_num_col_name:
×
133
                column_list = batch.columns
×
134
                row_num_alias = get_row_num_column_alias(column_list)
×
135
                row_num_col_name = "{}.{}".format(row_num_alias, ROW_NUM_COLUMN)
×
136

137
            if not batch.frames[row_num_col_name].isin(row_num_df["row_num_np"]).any():
×
138
                continue
×
139

140
            for index, row in batch.frames.iterrows():
×
141
                row_dict = row.to_dict()
×
142
                res_data_list.append(row_dict)
×
143

144
        result_df = pd.DataFrame(res_data_list)
×
145

146
        final_df = pd.merge(
×
147
            row_num_df,
148
            result_df,
149
            left_on="row_num_np",
150
            right_on=row_num_col_name,
151
            how="inner",
152
        )
153

154
        if "row_num_np" in final_df:
×
155
            del final_df["row_num_np"]
×
156
        yield Batch(final_df)
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc