• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

georgia-tech-db / eva / 7c1c5cb7-cb1e-4c69-985c-b642250357b6

28 Oct 2023 10:44PM UTC coverage: 76.956% (-1.7%) from 78.704%
7c1c5cb7-cb1e-4c69-985c-b642250357b6

push

circle-ci

xzdandy
Fix list

8990 of 11682 relevant lines covered (76.96%)

1.73 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

79.35
/evadb/executor/executor_utils.py
1
# coding=utf-8
2
# Copyright 2018-2023 EvaDB
3
#
4
# Licensed under the Apache License, Version 2.0 (the "License");
5
# you may not use this file except in compliance with the License.
6
# You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
import glob
2✔
16
import os
2✔
17
from pathlib import Path
2✔
18
from typing import TYPE_CHECKING, Generator, List, Union
2✔
19

20
from evadb.catalog.catalog_utils import xform_column_definitions_to_catalog_entries
2✔
21
from evadb.catalog.models.utils import TableCatalogEntry
2✔
22
from evadb.parser.create_statement import ColumnDefinition
2✔
23

24
if TYPE_CHECKING:
25
    from evadb.catalog.catalog_manager import CatalogManager
26

27
from evadb.catalog.catalog_type import TableType, VectorStoreType
2✔
28
from evadb.expression.abstract_expression import AbstractExpression
2✔
29
from evadb.expression.function_expression import FunctionExpression
2✔
30
from evadb.models.storage.batch import Batch
2✔
31
from evadb.parser.table_ref import TableInfo
2✔
32
from evadb.parser.types import FileFormatType
2✔
33
from evadb.readers.document.registry import SUPPORTED_TYPES
2✔
34
from evadb.utils.generic_utils import try_to_import_cv2
2✔
35
from evadb.utils.logging_manager import logger
2✔
36

37

38
class ExecutorError(Exception):
2✔
39
    pass
2✔
40

41

42
def instrument_function_expression_cost(
2✔
43
    expr: Union[AbstractExpression, List[AbstractExpression]],
44
    catalog: "CatalogManager",
45
):
46
    """We are expecting an instance of a catalog. An optimization can be to avoid creating a catalog instance if there is no function expression. An easy fix is to pass the function handler and create the catalog instance only if there is a function expression. In the past, this was problematic because of Ray. We can revisit it again."""
47

48
    if expr is None:
2✔
49
        return
2✔
50

51
    list_expr = expr
2✔
52
    if not isinstance(expr, list):
2✔
53
        list_expr = [expr]
2✔
54

55
    # persist stats of function expression
56
    for expr in list_expr:
2✔
57
        for func_expr in expr.find_all(FunctionExpression):
2✔
58
            if func_expr.function_obj and func_expr._stats:
2✔
59
                function_id = func_expr.function_obj.row_id
2✔
60
                catalog.upsert_function_cost_catalog_entry(
2✔
61
                    function_id,
62
                    func_expr.function_obj.name,
63
                    func_expr._stats.prev_cost,
64
                )
65

66

67
def apply_project(batch: Batch, project_list: List[AbstractExpression]):
2✔
68
    if not batch.empty() and project_list:
2✔
69
        batches = [expr.evaluate(batch) for expr in project_list]
2✔
70
        batch = Batch.merge_column_wise(batches)
2✔
71

72
    return batch
2✔
73

74

75
def apply_predicate(batch: Batch, predicate: AbstractExpression) -> Batch:
2✔
76
    if not batch.empty() and predicate is not None:
2✔
77
        outcomes = predicate.evaluate(batch)
2✔
78
        batch.drop_zero(outcomes)
2✔
79
        batch.reset_index()
2✔
80

81
    return batch
2✔
82

83

84
def handle_if_not_exists(
2✔
85
    catalog: "CatalogManager", table_info: TableInfo, if_not_exist=False
86
):
87
    # Table exists
88
    if catalog.check_table_exists(
2✔
89
        table_info.table_name,
90
        table_info.database_name,
91
    ):
92
        err_msg = "Table: {} already exists".format(table_info)
×
93
        if if_not_exist:
×
94
            logger.warn(err_msg)
×
95
            return True
×
96
        else:
97
            logger.error(err_msg)
×
98
            raise ExecutorError(err_msg)
99
    # Table does not exist
100
    else:
101
        return False
2✔
102

103

104
def validate_image(image_path: Path) -> bool:
2✔
105
    try:
1✔
106
        try_to_import_cv2()
1✔
107
        import cv2
1✔
108

109
        data = cv2.imread(str(image_path))
1✔
110
        return data is not None
1✔
111
    except Exception as e:
112
        logger.warning(
113
            f"Unexpected Exception {e} occurred while reading image file {image_path}"
114
        )
115
        return False
116

117

118
def iter_path_regex(path_regex: Path) -> Generator[str, None, None]:
2✔
119
    return glob.iglob(os.path.expanduser(path_regex), recursive=True)
2✔
120

121

122
def validate_video(video_path: Path) -> bool:
2✔
123
    try:
2✔
124
        try_to_import_cv2()
2✔
125
        import cv2
2✔
126

127
        vid = cv2.VideoCapture(str(video_path))
2✔
128
        if not vid.isOpened():
2✔
129
            return False
×
130
        return True
2✔
131
    except Exception as e:
132
        logger.warning(
133
            f"Unexpected Exception {e} occurred while reading video file {video_path}"
134
        )
135

136

137
def validate_document(doc_path: Path) -> bool:
2✔
138
    return doc_path.suffix in SUPPORTED_TYPES
×
139

140

141
def validate_pdf(doc_path: Path) -> bool:
2✔
142
    return doc_path.suffix == ".pdf"
1✔
143

144

145
def validate_media(file_path: Path, media_type: FileFormatType) -> bool:
2✔
146
    if media_type == FileFormatType.VIDEO:
2✔
147
        return validate_video(file_path)
2✔
148
    elif media_type == FileFormatType.IMAGE:
1✔
149
        return validate_image(file_path)
1✔
150
    elif media_type == FileFormatType.DOCUMENT:
1✔
151
        return validate_document(file_path)
×
152
    elif media_type == FileFormatType.PDF:
1✔
153
        return validate_pdf(file_path)
1✔
154
    else:
155
        raise ValueError(f"Unsupported Media type {str(media_type)}")
156

157

158
def handle_vector_store_params(
2✔
159
    vector_store_type: VectorStoreType, index_path: str, catalog
160
) -> dict:
161
    """Handle vector store parameters based on the vector store type and index path.
162

163
    Args:
164
        vector_store_type (VectorStoreType): The type of vector store.
165
        index_path (str): The path to store the index.
166

167
    Returns:
168
        dict: Dictionary containing the appropriate vector store parameters.
169

170

171
    Raises:
172
        ValueError: If the vector store type in the node is not supported.
173
    """
174
    if vector_store_type == VectorStoreType.FAISS:
1✔
175
        return {"index_path": index_path}
1✔
176
    elif vector_store_type == VectorStoreType.QDRANT:
×
177
        return {"index_db": str(Path(index_path).parent)}
×
178
    elif vector_store_type == VectorStoreType.CHROMADB:
×
179
        return {"index_path": str(Path(index_path).parent)}
×
180
    elif vector_store_type == VectorStoreType.PINECONE:
×
181
        # add the required API_KEYS
182
        return {
×
183
            "PINECONE_API_KEY": catalog().get_configuration_catalog_value(
184
                "PINECONE_API_KEY"
185
            ),
186
            "PINECONE_ENV": catalog().get_configuration_catalog_value("PINECONE_ENV"),
187
        }
188
    elif vector_store_type == VectorStoreType.MILVUS:
×
189
        return {
×
190
            "MILVUS_URI": catalog().get_configuration_catalog_value("MILVUS_URI"),
191
            "MILVUS_USER": catalog().get_configuration_catalog_value("MILVUS_USER"),
192
            "MILVUS_PASSWORD": catalog().get_configuration_catalog_value(
193
                "MILVUS_PASSWORD"
194
            ),
195
            "MILVUS_DB_NAME": catalog().get_configuration_catalog_value(
196
                "MILVUS_DB_NAME"
197
            ),
198
            "MILVUS_TOKEN": catalog().get_configuration_catalog_value("MILVUS_TOKEN"),
199
        }
200
    else:
201
        raise ValueError("Unsupported vector store type: {}".format(vector_store_type))
202

203

204
def create_table_catalog_entry_for_native_table(
2✔
205
    table_info: TableInfo, column_list: List[ColumnDefinition]
206
):
207
    column_catalog_entries = xform_column_definitions_to_catalog_entries(column_list)
×
208

209
    # Assemble table.
210
    table_catalog_entry = TableCatalogEntry(
×
211
        name=table_info.table_name,
212
        file_url=None,
213
        table_type=TableType.NATIVE_DATA,
214
        columns=column_catalog_entries,
215
        database_name=table_info.database_name,
216
    )
217
    return table_catalog_entry
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc