• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

georgia-tech-db / eva / f3cb8bbb-8165-49fa-af41-b587f634b3c4

pending completion
f3cb8bbb-8165-49fa-af41-b587f634b3c4

Pull #814

circle-ci

jiashenC
update results
Pull Request #814: feat: benchmark question answering v1

42 of 42 new or added lines in 2 files covered. (100.0%)

10095 of 10421 relevant lines covered (96.87%)

0.97 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.39
/eva/optimizer/optimizer_utils.py
1
# coding=utf-8
2
# Copyright 2018-2023 EVA
3
#
4
# Licensed under the Apache License, Version 2.0 (the "License");
5
# you may not use this file except in compliance with the License.
6
# You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
from typing import List, Tuple
1✔
16

17
from eva.catalog.catalog_manager import CatalogManager
1✔
18
from eva.catalog.catalog_utils import get_table_primary_columns
1✔
19
from eva.catalog.models.column_catalog import ColumnCatalogEntry
1✔
20
from eva.catalog.models.udf_io_catalog import UdfIOCatalogEntry
1✔
21
from eva.catalog.models.udf_metadata_catalog import UdfMetadataCatalogEntry
1✔
22
from eva.constants import CACHEABLE_UDFS, DEFAULT_FUNCTION_EXPRESSION_COST
1✔
23
from eva.expression.abstract_expression import AbstractExpression, ExpressionType
1✔
24
from eva.expression.expression_utils import (
1✔
25
    conjunction_list_to_expression_tree,
26
    contains_single_column,
27
    get_columns_in_predicate,
28
    is_simple_predicate,
29
    to_conjunction_list,
30
)
31
from eva.expression.function_expression import (
1✔
32
    FunctionExpression,
33
    FunctionExpressionCache,
34
)
35
from eva.expression.tuple_value_expression import TupleValueExpression
1✔
36
from eva.parser.alias import Alias
1✔
37
from eva.parser.create_statement import ColumnDefinition
1✔
38
from eva.utils.kv_cache import DiskKVCache
1✔
39

40

41
def column_definition_to_udf_io(col_list: List[ColumnDefinition], is_input: bool):
1✔
42
    """Create the UdfIOCatalogEntry object for each column definition provided
43

44
    Arguments:
45
        col_list(List[ColumnDefinition]): parsed input/output definitions
46
        is_input(bool): true if input else false
47
    """
48
    if isinstance(col_list, ColumnDefinition):
1✔
49
        col_list = [col_list]
1✔
50

51
    result_list = []
1✔
52
    for col in col_list:
1✔
53
        assert col is not None, "Empty column definition while creating udf io"
1✔
54
        result_list.append(
1✔
55
            UdfIOCatalogEntry(
56
                col.name,
57
                col.type,
58
                col.cci.nullable,
59
                array_type=col.array_type,
60
                array_dimensions=col.dimension,
61
                is_input=is_input,
62
            )
63
        )
64
    return result_list
1✔
65

66

67
def metadata_definition_to_udf_metadata(metadata_list: List[Tuple[str, str]]):
1✔
68
    """Create the UdfMetadataCatalogEntry object for each metadata definition provided
69

70
    Arguments:
71
        col_list(List[Tuple[str, str]]): parsed metadata definitions
72
    """
73
    result_list = []
1✔
74
    for metadata in metadata_list:
1✔
75
        result_list.append(
1✔
76
            UdfMetadataCatalogEntry(
77
                metadata[0],
78
                metadata[1],
79
            )
80
        )
81
    return result_list
1✔
82

83

84
def extract_equi_join_keys(
1✔
85
    join_predicate: AbstractExpression,
86
    left_table_aliases: List[Alias],
87
    right_table_aliases: List[Alias],
88
) -> Tuple[List[AbstractExpression], List[AbstractExpression]]:
89
    pred_list = to_conjunction_list(join_predicate)
1✔
90
    left_join_keys = []
1✔
91
    right_join_keys = []
1✔
92
    left_table_alias_strs = [
1✔
93
        left_table_alias.alias_name for left_table_alias in left_table_aliases
94
    ]
95
    right_table_alias_strs = [
1✔
96
        right_table_alias.alias_name for right_table_alias in right_table_aliases
97
    ]
98

99
    for pred in pred_list:
1✔
100
        if pred.etype == ExpressionType.COMPARE_EQUAL:
1✔
101
            left_child = pred.children[0]
1✔
102
            right_child = pred.children[1]
1✔
103
            # only extract if both are TupleValueExpression
104
            if (
1✔
105
                left_child.etype == ExpressionType.TUPLE_VALUE
106
                and right_child.etype == ExpressionType.TUPLE_VALUE
107
            ):
108
                if (
1✔
109
                    left_child.table_alias in left_table_alias_strs
110
                    and right_child.table_alias in right_table_alias_strs
111
                ):
112
                    left_join_keys.append(left_child)
1✔
113
                    right_join_keys.append(right_child)
1✔
114
                elif (
1✔
115
                    left_child.table_alias in right_table_alias_strs
116
                    and right_child.table_alias in left_table_alias_strs
117
                ):
118
                    left_join_keys.append(right_child)
1✔
119
                    right_join_keys.append(left_child)
1✔
120

121
    return (left_join_keys, right_join_keys)
1✔
122

123

124
def extract_pushdown_predicate(
1✔
125
    predicate: AbstractExpression, column_alias: str
126
) -> Tuple[AbstractExpression, AbstractExpression]:
127
    """Decompose the predicate into pushdown predicate and remaining predicate
128

129
    Args:
130
        predicate (AbstractExpression): predicate that needs to be decomposed
131
        column (str): column_alias to extract predicate
132
    Returns:
133
        Tuple[AbstractExpression, AbstractExpression]: (pushdown predicate,
134
        remaining predicate)
135
    """
136
    if predicate is None:
1✔
137
        return None, None
×
138

139
    if contains_single_column(predicate, column_alias):
1✔
140
        if is_simple_predicate(predicate):
1✔
141
            return predicate, None
1✔
142

143
    pushdown_preds = []
1✔
144
    rem_pred = []
1✔
145
    pred_list = to_conjunction_list(predicate)
1✔
146
    for pred in pred_list:
1✔
147
        if contains_single_column(pred, column_alias) and is_simple_predicate(pred):
1✔
148
            pushdown_preds.append(pred)
1✔
149
        else:
150
            rem_pred.append(pred)
1✔
151

152
    return (
1✔
153
        conjunction_list_to_expression_tree(pushdown_preds),
154
        conjunction_list_to_expression_tree(rem_pred),
155
    )
156

157

158
def extract_pushdown_predicate_for_alias(
1✔
159
    predicate: AbstractExpression, aliases: List[Alias]
160
):
161
    """Extract predicate that can be pushed down based on the input aliases.
162

163
    Atomic predicates on the table columns that are the subset of the input aliases are
164
    considered as candidates for pushdown.
165

166
    Args:
167
        predicate (AbstractExpression): input predicate
168
        aliases (List[str]): aliases for which predicate can be pushed
169
    """
170
    if predicate is None:
1✔
171
        return None, None
1✔
172

173
    pred_list = to_conjunction_list(predicate)
1✔
174
    pushdown_preds = []
1✔
175
    rem_pred = []
1✔
176
    aliases = [alias.alias_name for alias in aliases]
1✔
177
    for pred in pred_list:
1✔
178
        column_aliases = get_columns_in_predicate(pred)
1✔
179
        table_aliases = set([col.split(".")[0] for col in column_aliases])
1✔
180
        if table_aliases.issubset(set(aliases)):
1✔
181
            pushdown_preds.append(pred)
1✔
182
        else:
183
            rem_pred.append(pred)
1✔
184
    return (
1✔
185
        conjunction_list_to_expression_tree(pushdown_preds),
186
        conjunction_list_to_expression_tree(rem_pred),
187
    )
188

189

190
def optimize_cache_key(expr: FunctionExpression):
1✔
191
    """Optimize the cache key
192

193
    It tries to reduce the caching overhead by replacing the caching key with logically equivalent key. For instance, frame data can be replaced with frame id.
194

195
    Args:
196
        expr (FunctionExpression): expression to optimize the caching key for.
197

198
    Example:
199
        Yolo(data) -> return id
200

201
    Todo: Optimize complex expression
202
        FaceDet(Crop(data, bbox)) -> return
203

204
    """
205
    keys = expr.children
1✔
206
    # handle simple one column inputs
207
    if len(keys) == 1 and isinstance(keys[0], TupleValueExpression):
1✔
208
        child = keys[0]
1✔
209
        col_catalog_obj = child.col_object
1✔
210
        if isinstance(col_catalog_obj, ColumnCatalogEntry):
1✔
211
            new_keys = []
1✔
212
            table_obj = CatalogManager().get_table_catalog_entry(
1✔
213
                col_catalog_obj.table_name
214
            )
215
            for col in get_table_primary_columns(table_obj):
1✔
216
                new_obj = CatalogManager().get_column_catalog_entry(table_obj, col.name)
1✔
217
                new_keys.append(
1✔
218
                    TupleValueExpression(
219
                        col_name=col.name,
220
                        table_alias=child.table_alias,
221
                        col_object=new_obj,
222
                        col_alias=f"{child.table_alias}.{col.name}",
223
                    )
224
                )
225

226
            return new_keys
1✔
227
    return keys
×
228

229

230
def enable_cache_init(func_expr: FunctionExpression) -> FunctionExpressionCache:
1✔
231
    optimized_key = optimize_cache_key(func_expr)
1✔
232
    if optimized_key == func_expr.children:
1✔
233
        optimized_key = [None]
×
234

235
    name = func_expr.signature()
1✔
236
    cache_entry = CatalogManager().get_udf_cache_catalog_entry_by_name(name)
1✔
237
    if not cache_entry:
1✔
238
        cache_entry = CatalogManager().insert_udf_cache_catalog_entry(func_expr)
1✔
239

240
    cache = FunctionExpressionCache(
1✔
241
        key=tuple(optimized_key), store=DiskKVCache(cache_entry.cache_path)
242
    )
243
    return cache
1✔
244

245

246
def enable_cache(func_expr: FunctionExpression) -> FunctionExpression:
1✔
247
    """Enables cache for a function expression.
248

249
    The cache key is optimized by replacing it with logical equivalent expressions.
250
    A cache entry is inserted in the catalog corresponding to the expression.
251

252
    Args:
253
        func_expr (FunctionExpression): The function expression to enable cache for.
254

255
    Returns:
256
        FunctionExpression: The function expression with cache enabled.
257
    """
258
    cache = enable_cache_init(func_expr)
1✔
259
    return func_expr.copy().enable_cache(cache)
1✔
260

261

262
def enable_cache_on_expression_tree(expr_tree: AbstractExpression):
1✔
263
    func_exprs = list(expr_tree.find_all(FunctionExpression))
1✔
264
    func_exprs = list(
1✔
265
        filter(lambda expr: check_expr_validity_for_cache(expr), func_exprs)
266
    )
267
    for expr in func_exprs:
1✔
268
        cache = enable_cache_init(expr)
1✔
269
        expr.enable_cache(cache)
1✔
270

271

272
def check_expr_validity_for_cache(expr: FunctionExpression):
1✔
273
    return (
1✔
274
        expr.name in CACHEABLE_UDFS
275
        and not expr.has_cache()
276
        and len(expr.children) <= 1
277
        and isinstance(expr.children[0], TupleValueExpression)
278
    )
279

280

281
def get_expression_execution_cost(expr: AbstractExpression) -> float:
1✔
282
    """
283
    This function computes the estimated cost of executing the given abstract expression
284
    based on the statistics in the catalog. The function assumes that all the
285
    expression, except for the FunctionExpression, have a cost of zero.
286
    For FunctionExpression, it checks the catalog for relevant statistics; if none are
287
    available, it uses a default cost of DEFAULT_FUNCTION_EXPRESSION_COST.
288

289
    Args:
290
        expr (AbstractExpression): The AbstractExpression object whose cost
291
        needs to be computed.
292

293
    Returns:
294
        float: The estimated cost of executing the function expression.
295
    """
296
    total_cost = 0
1✔
297
    # iterate over all the function expression and accumulate the cost
298
    for child_expr in expr.find_all(FunctionExpression):
1✔
299
        cost_entry = CatalogManager().get_udf_cost_catalog_entry(child_expr.name)
1✔
300
        if cost_entry:
1✔
301
            total_cost += cost_entry.cost
1✔
302
        else:
303
            total_cost += DEFAULT_FUNCTION_EXPRESSION_COST
1✔
304
    return total_cost
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc