• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

georgia-tech-db / eva / a4c010ba-78be-4818-8e6f-1da08c6af280

31 Aug 2023 11:59PM UTC coverage: 70.992% (-10.6%) from 81.552%
a4c010ba-78be-4818-8e6f-1da08c6af280

push

circle-ci

web-flow
Merge branch 'staging' into evadb_staging

54 of 54 new or added lines in 3 files covered. (100.0%)

8020 of 11297 relevant lines covered (70.99%)

0.71 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

28.3
/evadb/executor/vector_index_scan_executor.py
1
# coding=utf-8
2
# Copyright 2018-2023 EvaDB
3
#
4
# Licensed under the Apache License, Version 2.0 (the "License");
5
# you may not use this file except in compliance with the License.
6
# You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
from typing import Iterator
1✔
16

17
import pandas as pd
1✔
18

19
from evadb.catalog.sql_config import IDENTIFIER_COLUMN
1✔
20
from evadb.database import EvaDBDatabase
1✔
21
from evadb.executor.abstract_executor import AbstractExecutor
1✔
22
from evadb.executor.executor_utils import handle_vector_store_params
1✔
23
from evadb.models.storage.batch import Batch
1✔
24
from evadb.plan_nodes.vector_index_scan_plan import VectorIndexScanPlan
1✔
25
from evadb.third_party.vector_stores.types import VectorIndexQuery
1✔
26
from evadb.third_party.vector_stores.utils import VectorStoreFactory
1✔
27
from evadb.utils.logging_manager import logger
1✔
28

29

30
# Helper function for getting row_id column alias.
31
def get_row_id_column_alias(column_list):
1✔
32
    for column in column_list:
×
33
        alias, col_name = column.split(".")
×
34
        if col_name == IDENTIFIER_COLUMN:
×
35
            return alias
×
36

37

38
class VectorIndexScanExecutor(AbstractExecutor):
1✔
39
    def __init__(self, db: EvaDBDatabase, node: VectorIndexScanPlan):
1✔
40
        super().__init__(db, node)
×
41

42
        self.index_name = node.index_name
×
43
        self.limit_count = node.limit_count
×
44
        self.search_query_expr = node.search_query_expr
×
45

46
    def exec(self, *args, **kwargs) -> Iterator[Batch]:
1✔
47
        # Fetch the index from disk.
48
        index_catalog_entry = self.catalog().get_index_catalog_entry_by_name(
×
49
            self.index_name
50
        )
51
        self.index_path = index_catalog_entry.save_file_path
×
52
        self.index = VectorStoreFactory.init_vector_store(
×
53
            self.node.vector_store_type,
54
            self.index_name,
55
            **handle_vector_store_params(self.node.vector_store_type, self.index_path),
56
        )
57

58
        # Get the query feature vector. Create a dummy
59
        # batch to retreat a single file path.
60
        dummy_batch = Batch(
×
61
            frames=pd.DataFrame(
62
                {"0": [0]},
63
            )
64
        )
65
        search_batch = self.search_query_expr.evaluate(dummy_batch)
×
66

67
        # Scan index. The search batch comes from the Open call.
68
        feature_col_name = self.search_query_expr.output_objs[0].name
×
69
        search_batch.drop_column_alias()
×
70
        search_feat = search_batch.column_as_numpy_array(feature_col_name)[0]
×
71
        search_feat = search_feat.reshape(1, -1)
×
72
        index_result = self.index.query(
×
73
            VectorIndexQuery(search_feat, self.limit_count.value)
74
        )
75
        # todo support queries over distance as well
76
        # distance_list = index_result.similarities
77
        row_id_np = index_result.ids
×
78

79
        # Load projected columns from disk and join with search results.
80
        row_id_col_name = None
×
81

82
        # handle the case where the index_results are less than self.limit_count.value
83
        num_required_results = self.limit_count.value
×
84
        if len(index_result.ids) < self.limit_count.value:
×
85
            num_required_results = len(index_result.ids)
×
86
            logger.warning(
×
87
                f"The index {self.index_name} returned only {num_required_results} results, which is fewer than the required {self.limit_count.value}."
88
            )
89

90
        res_row_list = [None for _ in range(num_required_results)]
×
91
        for batch in self.children[0].exec(**kwargs):
×
92
            column_list = batch.columns
×
93
            if not row_id_col_name:
×
94
                row_id_alias = get_row_id_column_alias(column_list)
×
95
                row_id_col_name = "{}.{}".format(row_id_alias, IDENTIFIER_COLUMN)
×
96

97
            # Nested join.
98
            for _, row in batch.frames.iterrows():
×
99
                for idx, rid in enumerate(row_id_np):
×
100
                    if rid == row[row_id_col_name]:
×
101
                        res_row = dict()
×
102
                        for col_name in column_list:
×
103
                            res_row[col_name] = row[col_name]
×
104
                        res_row_list[idx] = res_row
×
105

106
        yield Batch(pd.DataFrame(res_row_list))
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc