• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

georgia-tech-db / eva / #820

01 Oct 2023 02:22AM UTC coverage: 0.0% (-73.7%) from 73.748%
#820

push

circle-ci

Jiashen Cao
fix lint error

0 of 12361 relevant lines covered (0.0%)

0.0 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/evadb/executor/executor_utils.py
1
# coding=utf-8
2
# Copyright 2018-2023 EvaDB
3
#
4
# Licensed under the Apache License, Version 2.0 (the "License");
5
# you may not use this file except in compliance with the License.
6
# You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
import glob
×
16
import os
×
17
from pathlib import Path
×
18
from typing import TYPE_CHECKING, Generator, List
×
19

20
from evadb.catalog.catalog_utils import xform_column_definitions_to_catalog_entries
×
21
from evadb.catalog.models.utils import TableCatalogEntry
×
22
from evadb.parser.create_statement import ColumnDefinition
×
23

24
if TYPE_CHECKING:
25
    from evadb.catalog.catalog_manager import CatalogManager
26

27
from evadb.catalog.catalog_type import TableType, VectorStoreType
×
28
from evadb.expression.abstract_expression import AbstractExpression
×
29
from evadb.expression.function_expression import FunctionExpression
×
30
from evadb.models.storage.batch import Batch
×
31
from evadb.parser.table_ref import TableInfo
×
32
from evadb.parser.types import FileFormatType
×
33
from evadb.readers.document.registry import SUPPORTED_TYPES
×
34
from evadb.utils.generic_utils import try_to_import_cv2
×
35
from evadb.utils.logging_manager import logger
×
36

37

38
class ExecutorError(Exception):
×
39
    pass
×
40

41

42
def apply_project(
×
43
    batch: Batch, project_list: List[AbstractExpression], catalog: "CatalogManager"
44
):
45
    if not batch.empty() and project_list:
×
46
        batches = [expr.evaluate(batch) for expr in project_list]
×
47
        batch = Batch.merge_column_wise(batches)
×
48

49
        # persist stats of function expression
50
        for expr in project_list:
×
51
            for func_expr in expr.find_all(FunctionExpression):
×
52
                if func_expr.function_obj and func_expr._stats:
×
53
                    function_id = func_expr.function_obj.row_id
×
54
                    catalog.upsert_function_cost_catalog_entry(
×
55
                        function_id,
56
                        func_expr.function_obj.name,
57
                        func_expr._stats.prev_cost,
58
                    )
59
    return batch
×
60

61

62
def apply_predicate(
×
63
    batch: Batch, predicate: AbstractExpression, catalog: "CatalogManager"
64
) -> Batch:
65
    if not batch.empty() and predicate is not None:
×
66
        outcomes = predicate.evaluate(batch)
×
67
        batch.drop_zero(outcomes)
×
68
        batch.reset_index()
×
69

70
        # persist stats of function expression
71
        for func_expr in predicate.find_all(FunctionExpression):
×
72
            if func_expr.function_obj and func_expr._stats:
×
73
                function_id = func_expr.function_obj.row_id
×
74
                catalog.upsert_function_cost_catalog_entry(
×
75
                    function_id, func_expr.function_obj.name, func_expr._stats.prev_cost
76
                )
77
    return batch
×
78

79

80
def handle_if_not_exists(
×
81
    catalog: "CatalogManager", table_info: TableInfo, if_not_exist=False
82
):
83
    # Table exists
84
    if catalog.check_table_exists(
×
85
        table_info.table_name,
86
        table_info.database_name,
87
    ):
88
        err_msg = "Table: {} already exists".format(table_info)
×
89
        if if_not_exist:
×
90
            logger.warn(err_msg)
×
91
            return True
×
92
        else:
93
            logger.error(err_msg)
×
94
            raise ExecutorError(err_msg)
95
    # Table does not exist
96
    else:
97
        return False
×
98

99

100
def validate_image(image_path: Path) -> bool:
×
101
    try:
×
102
        try_to_import_cv2()
×
103
        import cv2
×
104

105
        data = cv2.imread(str(image_path))
×
106
        return data is not None
×
107
    except Exception as e:
108
        logger.warning(
109
            f"Unexpected Exception {e} occurred while reading image file {image_path}"
110
        )
111
        return False
112

113

114
def iter_path_regex(path_regex: Path) -> Generator[str, None, None]:
×
115
    return glob.iglob(os.path.expanduser(path_regex), recursive=True)
×
116

117

118
def validate_video(video_path: Path) -> bool:
×
119
    try:
×
120
        try_to_import_cv2()
×
121
        import cv2
×
122

123
        vid = cv2.VideoCapture(str(video_path))
×
124
        if not vid.isOpened():
×
125
            return False
×
126
        return True
×
127
    except Exception as e:
128
        logger.warning(
129
            f"Unexpected Exception {e} occurred while reading video file {video_path}"
130
        )
131

132

133
def validate_document(doc_path: Path) -> bool:
×
134
    return doc_path.suffix in SUPPORTED_TYPES
×
135

136

137
def validate_pdf(doc_path: Path) -> bool:
×
138
    return doc_path.suffix == ".pdf"
×
139

140

141
def validate_media(file_path: Path, media_type: FileFormatType) -> bool:
×
142
    if media_type == FileFormatType.VIDEO:
×
143
        return validate_video(file_path)
×
144
    elif media_type == FileFormatType.IMAGE:
×
145
        return validate_image(file_path)
×
146
    elif media_type == FileFormatType.DOCUMENT:
×
147
        return validate_document(file_path)
×
148
    elif media_type == FileFormatType.PDF:
×
149
        return validate_pdf(file_path)
×
150
    else:
151
        raise ValueError(f"Unsupported Media type {str(media_type)}")
152

153

154
def handle_vector_store_params(
×
155
    vector_store_type: VectorStoreType, index_path: str
156
) -> dict:
157
    """Handle vector store parameters based on the vector store type and index path.
158

159
    Args:
160
        vector_store_type (VectorStoreType): The type of vector store.
161
        index_path (str): The path to store the index.
162

163
    Returns:
164
        dict: Dictionary containing the appropriate vector store parameters.
165

166

167
    Raises:
168
        ValueError: If the vector store type in the node is not supported.
169
    """
170
    if vector_store_type == VectorStoreType.FAISS:
×
171
        return {"index_path": index_path}
×
172
    elif vector_store_type == VectorStoreType.QDRANT:
×
173
        return {"index_db": str(Path(index_path).parent)}
×
174
    elif vector_store_type == VectorStoreType.CHROMADB:
×
175
        return {"index_path": str(Path(index_path).parent)}
×
176
    elif vector_store_type == VectorStoreType.PINECONE:
×
177
        return {}
×
178
    else:
179
        raise ValueError("Unsupported vector store type: {}".format(vector_store_type))
180

181

182
def create_table_catalog_entry_for_native_table(
×
183
    table_info: TableInfo, column_list: List[ColumnDefinition]
184
):
185
    column_catalog_entires = xform_column_definitions_to_catalog_entries(column_list)
×
186

187
    # Assemble table.
188
    table_catalog_entry = TableCatalogEntry(
×
189
        name=table_info.table_name,
190
        file_url=None,
191
        table_type=TableType.NATIVE_DATA,
192
        columns=column_catalog_entires,
193
        database_name=table_info.database_name,
194
    )
195
    return table_catalog_entry
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc