• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

georgia-tech-db / eva / a4c010ba-78be-4818-8e6f-1da08c6af280

31 Aug 2023 11:59PM UTC coverage: 70.992% (-10.6%) from 81.552%
a4c010ba-78be-4818-8e6f-1da08c6af280

push

circle-ci

web-flow
Merge branch 'staging' into evadb_staging

54 of 54 new or added lines in 3 files covered. (100.0%)

8020 of 11297 relevant lines covered (70.99%)

0.71 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

57.26
/evadb/optimizer/optimizer_utils.py
1
# coding=utf-8
2
# Copyright 2018-2023 EvaDB
3
#
4
# Licensed under the Apache License, Version 2.0 (the "License");
5
# you may not use this file except in compliance with the License.
6
# You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
import typing
1✔
16
from typing import List, Tuple
1✔
17

18
if typing.TYPE_CHECKING:
19
    from evadb.optimizer.optimizer_context import OptimizerContext
20

21
from evadb.catalog.catalog_utils import get_table_primary_columns
1✔
22
from evadb.catalog.models.column_catalog import ColumnCatalogEntry
1✔
23
from evadb.catalog.models.udf_io_catalog import UdfIOCatalogEntry
1✔
24
from evadb.catalog.models.udf_metadata_catalog import UdfMetadataCatalogEntry
1✔
25
from evadb.constants import CACHEABLE_UDFS, DEFAULT_FUNCTION_EXPRESSION_COST
1✔
26
from evadb.expression.abstract_expression import AbstractExpression, ExpressionType
1✔
27
from evadb.expression.expression_utils import (
1✔
28
    conjunction_list_to_expression_tree,
29
    contains_single_column,
30
    get_columns_in_predicate,
31
    is_simple_predicate,
32
    to_conjunction_list,
33
)
34
from evadb.expression.function_expression import (
1✔
35
    FunctionExpression,
36
    FunctionExpressionCache,
37
)
38
from evadb.expression.tuple_value_expression import TupleValueExpression
1✔
39
from evadb.parser.alias import Alias
1✔
40
from evadb.parser.create_statement import ColumnDefinition
1✔
41
from evadb.utils.kv_cache import DiskKVCache
1✔
42

43

44
def column_definition_to_udf_io(col_list: List[ColumnDefinition], is_input: bool):
1✔
45
    """Create the UdfIOCatalogEntry object for each column definition provided
46

47
    Arguments:
48
        col_list(List[ColumnDefinition]): parsed input/output definitions
49
        is_input(bool): true if input else false
50
    """
51
    if isinstance(col_list, ColumnDefinition):
1✔
52
        col_list = [col_list]
×
53

54
    result_list = []
1✔
55
    for col in col_list:
1✔
56
        assert col is not None, "Empty column definition while creating udf io"
1✔
57
        result_list.append(
1✔
58
            UdfIOCatalogEntry(
59
                col.name,
60
                col.type,
61
                col.cci.nullable,
62
                array_type=col.array_type,
63
                array_dimensions=col.dimension,
64
                is_input=is_input,
65
            )
66
        )
67
    return result_list
1✔
68

69

70
def metadata_definition_to_udf_metadata(metadata_list: List[Tuple[str, str]]):
1✔
71
    """Create the UdfMetadataCatalogEntry object for each metadata definition provided
72

73
    Arguments:
74
        col_list(List[Tuple[str, str]]): parsed metadata definitions
75
    """
76
    result_list = []
1✔
77
    for metadata in metadata_list:
1✔
78
        result_list.append(
1✔
79
            UdfMetadataCatalogEntry(
80
                metadata[0],
81
                metadata[1],
82
            )
83
        )
84
    return result_list
1✔
85

86

87
def extract_equi_join_keys(
1✔
88
    join_predicate: AbstractExpression,
89
    left_table_aliases: List[Alias],
90
    right_table_aliases: List[Alias],
91
) -> Tuple[List[AbstractExpression], List[AbstractExpression]]:
92
    pred_list = to_conjunction_list(join_predicate)
1✔
93
    left_join_keys = []
1✔
94
    right_join_keys = []
1✔
95
    left_table_alias_strs = [
1✔
96
        left_table_alias.alias_name for left_table_alias in left_table_aliases
97
    ]
98
    right_table_alias_strs = [
1✔
99
        right_table_alias.alias_name for right_table_alias in right_table_aliases
100
    ]
101

102
    for pred in pred_list:
1✔
103
        if pred.etype == ExpressionType.COMPARE_EQUAL:
1✔
104
            left_child = pred.children[0]
1✔
105
            right_child = pred.children[1]
1✔
106
            # only extract if both are TupleValueExpression
107
            if (
1✔
108
                left_child.etype == ExpressionType.TUPLE_VALUE
109
                and right_child.etype == ExpressionType.TUPLE_VALUE
110
            ):
111
                if (
1✔
112
                    left_child.table_alias in left_table_alias_strs
113
                    and right_child.table_alias in right_table_alias_strs
114
                ):
115
                    left_join_keys.append(left_child)
1✔
116
                    right_join_keys.append(right_child)
1✔
117
                elif (
1✔
118
                    left_child.table_alias in right_table_alias_strs
119
                    and right_child.table_alias in left_table_alias_strs
120
                ):
121
                    left_join_keys.append(right_child)
1✔
122
                    right_join_keys.append(left_child)
1✔
123

124
    return (left_join_keys, right_join_keys)
1✔
125

126

127
def extract_pushdown_predicate(
1✔
128
    predicate: AbstractExpression, column_alias: str
129
) -> Tuple[AbstractExpression, AbstractExpression]:
130
    """Decompose the predicate into pushdown predicate and remaining predicate
131

132
    Args:
133
        predicate (AbstractExpression): predicate that needs to be decomposed
134
        column (str): column_alias to extract predicate
135
    Returns:
136
        Tuple[AbstractExpression, AbstractExpression]: (pushdown predicate,
137
        remaining predicate)
138
    """
139
    if predicate is None:
1✔
140
        return None, None
×
141

142
    if contains_single_column(predicate, column_alias):
1✔
143
        if is_simple_predicate(predicate):
1✔
144
            return predicate, None
1✔
145

146
    pushdown_preds = []
×
147
    rem_pred = []
×
148
    pred_list = to_conjunction_list(predicate)
×
149
    for pred in pred_list:
×
150
        if contains_single_column(pred, column_alias) and is_simple_predicate(pred):
×
151
            pushdown_preds.append(pred)
×
152
        else:
153
            rem_pred.append(pred)
×
154

155
    return (
×
156
        conjunction_list_to_expression_tree(pushdown_preds),
157
        conjunction_list_to_expression_tree(rem_pred),
158
    )
159

160

161
def extract_pushdown_predicate_for_alias(
1✔
162
    predicate: AbstractExpression, aliases: List[Alias]
163
):
164
    """Extract predicate that can be pushed down based on the input aliases.
165

166
    Atomic predicates on the table columns that are the subset of the input aliases are
167
    considered as candidates for pushdown.
168

169
    Args:
170
        predicate (AbstractExpression): input predicate
171
        aliases (List[str]): aliases for which predicate can be pushed
172
    """
173
    if predicate is None:
1✔
174
        return None, None
×
175

176
    pred_list = to_conjunction_list(predicate)
1✔
177
    pushdown_preds = []
1✔
178
    rem_pred = []
1✔
179
    aliases = [alias.alias_name for alias in aliases]
1✔
180
    for pred in pred_list:
1✔
181
        column_aliases = get_columns_in_predicate(pred)
1✔
182
        table_aliases = set([col.split(".")[0] for col in column_aliases])
1✔
183
        if table_aliases.issubset(set(aliases)):
1✔
184
            pushdown_preds.append(pred)
1✔
185
        else:
186
            rem_pred.append(pred)
×
187
    return (
1✔
188
        conjunction_list_to_expression_tree(pushdown_preds),
189
        conjunction_list_to_expression_tree(rem_pred),
190
    )
191

192

193
def optimize_cache_key(context: "OptimizerContext", expr: FunctionExpression):
1✔
194
    """Optimize the cache key
195

196
    It tries to reduce the caching overhead by replacing the caching key with logically equivalent key. For instance, frame data can be replaced with frame id.
197

198
    Args:
199
        expr (FunctionExpression): expression to optimize the caching key for.
200

201
    Example:
202
        Yolo(data) -> return id
203

204
    Todo: Optimize complex expression
205
        FaceDet(Crop(data, bbox)) -> return
206

207
    """
208
    keys = expr.children
×
209
    catalog = context.db.catalog()
×
210
    # handle simple one column inputs
211
    if len(keys) == 1 and isinstance(keys[0], TupleValueExpression):
×
212
        child = keys[0]
×
213
        col_catalog_obj = child.col_object
×
214
        if isinstance(col_catalog_obj, ColumnCatalogEntry):
×
215
            new_keys = []
×
216
            table_obj = catalog.get_table_catalog_entry(col_catalog_obj.table_name)
×
217
            for col in get_table_primary_columns(table_obj):
×
218
                new_obj = catalog.get_column_catalog_entry(table_obj, col.name)
×
219
                new_keys.append(
×
220
                    TupleValueExpression(
221
                        name=col.name,
222
                        table_alias=child.table_alias,
223
                        col_object=new_obj,
224
                        col_alias=f"{child.table_alias}.{col.name}",
225
                    )
226
                )
227

228
            return new_keys
×
229
    return keys
×
230

231

232
def enable_cache_init(
1✔
233
    context: "OptimizerContext", func_expr: FunctionExpression
234
) -> FunctionExpressionCache:
235
    optimized_key = optimize_cache_key(context, func_expr)
×
236
    if optimized_key == func_expr.children:
×
237
        optimized_key = [None]
×
238

239
    catalog = context.db.catalog()
×
240
    name = func_expr.signature()
×
241
    cache_entry = catalog.get_udf_cache_catalog_entry_by_name(name)
×
242
    if not cache_entry:
×
243
        cache_entry = catalog.insert_udf_cache_catalog_entry(func_expr)
×
244

245
    cache = FunctionExpressionCache(
×
246
        key=tuple(optimized_key), store=DiskKVCache(cache_entry.cache_path)
247
    )
248
    return cache
×
249

250

251
def enable_cache(
1✔
252
    context: "OptimizerContext", func_expr: FunctionExpression
253
) -> FunctionExpression:
254
    """Enables cache for a function expression.
255

256
    The cache key is optimized by replacing it with logical equivalent expressions.
257
    A cache entry is inserted in the catalog corresponding to the expression.
258

259
    Args:
260
        context (OptimizerContext): associated optimizer context
261
        func_expr (FunctionExpression): The function expression to enable cache for.
262

263
    Returns:
264
        FunctionExpression: The function expression with cache enabled.
265
    """
266
    cache = enable_cache_init(context, func_expr)
×
267
    return func_expr.copy().enable_cache(cache)
×
268

269

270
def enable_cache_on_expression_tree(
1✔
271
    context: "OptimizerContext", expr_tree: AbstractExpression
272
):
273
    func_exprs = list(expr_tree.find_all(FunctionExpression))
×
274
    func_exprs = list(
×
275
        filter(lambda expr: check_expr_validity_for_cache(expr), func_exprs)
276
    )
277
    for expr in func_exprs:
×
278
        cache = enable_cache_init(context, expr)
×
279
        expr.enable_cache(cache)
×
280

281

282
def check_expr_validity_for_cache(expr: FunctionExpression):
1✔
283
    return (
×
284
        expr.name in CACHEABLE_UDFS
285
        and not expr.has_cache()
286
        and len(expr.children) <= 1
287
        and isinstance(expr.children[0], TupleValueExpression)
288
    )
289

290

291
def get_expression_execution_cost(
1✔
292
    context: "OptimizerContext", expr: AbstractExpression
293
) -> float:
294
    """
295
    This function computes the estimated cost of executing the given abstract expression
296
    based on the statistics in the catalog. The function assumes that all the
297
    expression, except for the FunctionExpression, have a cost of zero.
298
    For FunctionExpression, it checks the catalog for relevant statistics; if none are
299
    available, it uses a default cost of DEFAULT_FUNCTION_EXPRESSION_COST.
300

301
    Args:
302
        context (OptimizerContext): the associated optimizer context
303
        expr (AbstractExpression): The AbstractExpression object whose cost
304
        needs to be computed.
305

306
    Returns:
307
        float: The estimated cost of executing the function expression.
308
    """
309
    total_cost = 0
×
310
    # iterate over all the function expression and accumulate the cost
311
    for child_expr in expr.find_all(FunctionExpression):
×
312
        cost_entry = context.db.catalog().get_udf_cost_catalog_entry(child_expr.name)
×
313
        if cost_entry:
×
314
            total_cost += cost_entry.cost
×
315
        else:
316
            total_cost += DEFAULT_FUNCTION_EXPRESSION_COST
×
317
    return total_cost
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc