• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

georgia-tech-db / eva / #804

24 Sep 2023 03:30PM UTC coverage: 92.866% (-0.03%) from 92.899%
#804

push

circle-ci

Jiashen Cao
fix lint and address comments

2 of 2 new or added lines in 1 file covered. (100.0%)

11260 of 12125 relevant lines covered (92.87%)

0.93 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.8
/evadb/optimizer/optimizer_utils.py
1
# coding=utf-8
2
# Copyright 2018-2023 EvaDB
3
#
4
# Licensed under the Apache License, Version 2.0 (the "License");
5
# you may not use this file except in compliance with the License.
6
# You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
import typing
1✔
16
from typing import List, Tuple
1✔
17

18
if typing.TYPE_CHECKING:
19
    from evadb.optimizer.optimizer_context import OptimizerContext
20

21
from evadb.catalog.catalog_utils import get_table_primary_columns
1✔
22
from evadb.catalog.models.column_catalog import ColumnCatalogEntry
1✔
23
from evadb.catalog.models.function_io_catalog import FunctionIOCatalogEntry
1✔
24
from evadb.catalog.models.function_metadata_catalog import FunctionMetadataCatalogEntry
1✔
25
from evadb.constants import CACHEABLE_FUNCTIONS, DEFAULT_FUNCTION_EXPRESSION_COST
1✔
26
from evadb.expression.abstract_expression import AbstractExpression, ExpressionType
1✔
27
from evadb.expression.constant_value_expression import ConstantValueExpression
1✔
28
from evadb.expression.expression_utils import (
1✔
29
    conjunction_list_to_expression_tree,
30
    contains_single_column,
31
    get_columns_in_predicate,
32
    is_simple_predicate,
33
    to_conjunction_list,
34
)
35
from evadb.expression.function_expression import (
1✔
36
    FunctionExpression,
37
    FunctionExpressionCache,
38
)
39
from evadb.expression.tuple_value_expression import TupleValueExpression
1✔
40
from evadb.parser.alias import Alias
1✔
41
from evadb.parser.create_statement import ColumnDefinition
1✔
42
from evadb.utils.kv_cache import DiskKVCache
1✔
43

44

45
def column_definition_to_function_io(col_list: List[ColumnDefinition], is_input: bool):
1✔
46
    """Create the FunctionIOCatalogEntry object for each column definition provided
47

48
    Arguments:
49
        col_list(List[ColumnDefinition]): parsed input/output definitions
50
        is_input(bool): true if input else false
51
    """
52
    if isinstance(col_list, ColumnDefinition):
1✔
53
        col_list = [col_list]
1✔
54

55
    result_list = []
1✔
56
    for col in col_list:
1✔
57
        assert col is not None, "Empty column definition while creating function io"
1✔
58
        result_list.append(
1✔
59
            FunctionIOCatalogEntry(
60
                col.name,
61
                col.type,
62
                col.cci.nullable,
63
                array_type=col.array_type,
64
                array_dimensions=col.dimension,
65
                is_input=is_input,
66
            )
67
        )
68
    return result_list
1✔
69

70

71
def metadata_definition_to_function_metadata(metadata_list: List[Tuple[str, str]]):
1✔
72
    """Create the FunctionMetadataCatalogEntry object for each metadata definition provided
73

74
    Arguments:
75
        col_list(List[Tuple[str, str]]): parsed metadata definitions
76
    """
77
    result_list = []
1✔
78
    for metadata in metadata_list:
1✔
79
        result_list.append(
1✔
80
            FunctionMetadataCatalogEntry(
81
                metadata[0],
82
                metadata[1],
83
            )
84
        )
85
    return result_list
1✔
86

87

88
def extract_equi_join_keys(
1✔
89
    join_predicate: AbstractExpression,
90
    left_table_aliases: List[Alias],
91
    right_table_aliases: List[Alias],
92
) -> Tuple[List[AbstractExpression], List[AbstractExpression]]:
93
    pred_list = to_conjunction_list(join_predicate)
1✔
94
    left_join_keys = []
1✔
95
    right_join_keys = []
1✔
96
    left_table_alias_strs = [
1✔
97
        left_table_alias.alias_name for left_table_alias in left_table_aliases
98
    ]
99
    right_table_alias_strs = [
1✔
100
        right_table_alias.alias_name for right_table_alias in right_table_aliases
101
    ]
102

103
    for pred in pred_list:
1✔
104
        if pred.etype == ExpressionType.COMPARE_EQUAL:
1✔
105
            left_child = pred.children[0]
1✔
106
            right_child = pred.children[1]
1✔
107
            # only extract if both are TupleValueExpression
108
            if (
1✔
109
                left_child.etype == ExpressionType.TUPLE_VALUE
110
                and right_child.etype == ExpressionType.TUPLE_VALUE
111
            ):
112
                if (
1✔
113
                    left_child.table_alias in left_table_alias_strs
114
                    and right_child.table_alias in right_table_alias_strs
115
                ):
116
                    left_join_keys.append(left_child)
1✔
117
                    right_join_keys.append(right_child)
1✔
118
                elif (
1✔
119
                    left_child.table_alias in right_table_alias_strs
120
                    and right_child.table_alias in left_table_alias_strs
121
                ):
122
                    left_join_keys.append(right_child)
1✔
123
                    right_join_keys.append(left_child)
1✔
124

125
    return (left_join_keys, right_join_keys)
1✔
126

127

128
def extract_pushdown_predicate(
1✔
129
    predicate: AbstractExpression, column_alias: str
130
) -> Tuple[AbstractExpression, AbstractExpression]:
131
    """Decompose the predicate into pushdown predicate and remaining predicate
132

133
    Args:
134
        predicate (AbstractExpression): predicate that needs to be decomposed
135
        column (str): column_alias to extract predicate
136
    Returns:
137
        Tuple[AbstractExpression, AbstractExpression]: (pushdown predicate,
138
        remaining predicate)
139
    """
140
    if predicate is None:
1✔
141
        return None, None
×
142

143
    if contains_single_column(predicate, column_alias):
1✔
144
        if is_simple_predicate(predicate):
1✔
145
            return predicate, None
1✔
146

147
    pushdown_preds = []
1✔
148
    rem_pred = []
1✔
149
    pred_list = to_conjunction_list(predicate)
1✔
150
    for pred in pred_list:
1✔
151
        if contains_single_column(pred, column_alias) and is_simple_predicate(pred):
1✔
152
            pushdown_preds.append(pred)
1✔
153
        else:
154
            rem_pred.append(pred)
1✔
155

156
    return (
1✔
157
        conjunction_list_to_expression_tree(pushdown_preds),
158
        conjunction_list_to_expression_tree(rem_pred),
159
    )
160

161

162
def extract_pushdown_predicate_for_alias(
1✔
163
    predicate: AbstractExpression, aliases: List[Alias]
164
):
165
    """Extract predicate that can be pushed down based on the input aliases.
166

167
    Atomic predicates on the table columns that are the subset of the input aliases are
168
    considered as candidates for pushdown.
169

170
    Args:
171
        predicate (AbstractExpression): input predicate
172
        aliases (List[str]): aliases for which predicate can be pushed
173
    """
174
    if predicate is None:
1✔
175
        return None, None
1✔
176

177
    pred_list = to_conjunction_list(predicate)
1✔
178
    pushdown_preds = []
1✔
179
    rem_pred = []
1✔
180
    aliases = [alias.alias_name for alias in aliases]
1✔
181
    for pred in pred_list:
1✔
182
        column_aliases = get_columns_in_predicate(pred)
1✔
183
        table_aliases = set([col.split(".")[0] for col in column_aliases])
1✔
184
        if table_aliases.issubset(set(aliases)):
1✔
185
            pushdown_preds.append(pred)
1✔
186
        else:
187
            rem_pred.append(pred)
1✔
188
    return (
1✔
189
        conjunction_list_to_expression_tree(pushdown_preds),
190
        conjunction_list_to_expression_tree(rem_pred),
191
    )
192

193

194
def optimize_cache_key_for_tuple_value_expression(
1✔
195
    context: "OptimizerContext", tv_expr: TupleValueExpression
196
):
197
    catalog = context.db.catalog()
1✔
198
    col_catalog_obj = tv_expr.col_object
1✔
199

200
    # Optimized cache key for TupleValueExpression.
201
    new_keys = []
1✔
202

203
    if isinstance(col_catalog_obj, ColumnCatalogEntry):
1✔
204
        table_obj = catalog.get_table_catalog_entry(col_catalog_obj.table_name)
1✔
205
        for col in get_table_primary_columns(table_obj):
1✔
206
            new_obj = catalog.get_column_catalog_entry(table_obj, col.name)
1✔
207
            new_keys.append(
1✔
208
                TupleValueExpression(
209
                    name=col.name,
210
                    table_alias=tv_expr.table_alias,
211
                    col_object=new_obj,
212
                    col_alias=f"{tv_expr.table_alias}.{col.name}",
213
                )
214
            )
215
        return new_keys
1✔
216

217
    return [tv_expr]
×
218

219

220
def optimize_cache_key_for_constant_value_expression(
1✔
221
    context: "OptimizerContext", cv_expr: ConstantValueExpression
222
):
223
    # No need to additional optimization for constant value expression.
224
    return [cv_expr]
1✔
225

226

227
def optimize_cache_key(context: "OptimizerContext", expr: FunctionExpression):
1✔
228
    """Optimize the cache key
229

230
    It tries to reduce the caching overhead by replacing the caching key with
231
    logically equivalent key. For instance, frame data can be replaced with frame id.
232

233
    Args:
234
        expr (FunctionExpression): expression to optimize the caching key for.
235

236
    Example:
237
        Yolo(data) -> return id
238

239
    Todo: Optimize complex expression
240
        FaceDet(Crop(data, bbox)) -> return
241

242
    """
243
    keys = expr.children
1✔
244

245
    # Handle simple one column inputs.
246
    if len(keys) == 1 and isinstance(keys[0], TupleValueExpression):
1✔
247
        return optimize_cache_key_for_tuple_value_expression(context, keys[0])
1✔
248

249
    # Handle ConstantValueExpressin + TupleValueExpression
250
    if (
1✔
251
        len(keys) == 2
252
        and isinstance(keys[0], ConstantValueExpression)
253
        and isinstance(keys[1], TupleValueExpression)
254
    ):
255
        return optimize_cache_key_for_constant_value_expression(
1✔
256
            context, keys[0]
257
        ) + optimize_cache_key_for_tuple_value_expression(context, keys[1])
258

259
    return keys
×
260

261

262
def enable_cache_init(
1✔
263
    context: "OptimizerContext", func_expr: FunctionExpression
264
) -> FunctionExpressionCache:
265
    optimized_key = optimize_cache_key(context, func_expr)
1✔
266
    if optimized_key == func_expr.children:
1✔
267
        optimized_key = [None]
×
268

269
    catalog = context.db.catalog()
1✔
270
    name = func_expr.signature()
1✔
271
    cache_entry = catalog.get_function_cache_catalog_entry_by_name(name)
1✔
272
    if not cache_entry:
1✔
273
        cache_entry = catalog.insert_function_cache_catalog_entry(func_expr)
1✔
274

275
    cache = FunctionExpressionCache(
1✔
276
        key=tuple(optimized_key), store=DiskKVCache(cache_entry.cache_path)
277
    )
278
    return cache
1✔
279

280

281
def enable_cache(
1✔
282
    context: "OptimizerContext", func_expr: FunctionExpression
283
) -> FunctionExpression:
284
    """Enables cache for a function expression.
285

286
    The cache key is optimized by replacing it with logical equivalent expressions.
287
    A cache entry is inserted in the catalog corresponding to the expression.
288

289
    Args:
290
        context (OptimizerContext): associated optimizer context
291
        func_expr (FunctionExpression): The function expression to enable cache for.
292

293
    Returns:
294
        FunctionExpression: The function expression with cache enabled.
295
    """
296
    cache = enable_cache_init(context, func_expr)
1✔
297
    return func_expr.copy().enable_cache(cache)
1✔
298

299

300
def enable_cache_on_expression_tree(
1✔
301
    context: "OptimizerContext", expr_tree: AbstractExpression
302
):
303
    func_exprs = list(expr_tree.find_all(FunctionExpression))
1✔
304
    func_exprs = list(
1✔
305
        filter(lambda expr: check_expr_validity_for_cache(expr), func_exprs)
306
    )
307
    for expr in func_exprs:
1✔
308
        cache = enable_cache_init(context, expr)
1✔
309
        expr.enable_cache(cache)
1✔
310

311

312
def check_expr_validity_for_cache(expr: FunctionExpression):
1✔
313
    valid = expr.name in CACHEABLE_FUNCTIONS and not expr.has_cache()
1✔
314
    if len(expr.children) == 1:
1✔
315
        # Normal function that only takes one parameter.
316
        valid &= isinstance(expr.children[0], TupleValueExpression)
1✔
317
    elif len(expr.children) == 2:
1✔
318
        # LLM-based function that takes two parameters.
319
        valid &= isinstance(expr.children[0], ConstantValueExpression) and isinstance(
1✔
320
            expr.children[1], TupleValueExpression
321
        )
322
    return valid
×
323

324

325
def get_expression_execution_cost(
1✔
326
    context: "OptimizerContext", expr: AbstractExpression
327
) -> float:
328
    """
329
    This function computes the estimated cost of executing the given abstract expression
330
    based on the statistics in the catalog. The function assumes that all the
331
    expression, except for the FunctionExpression, have a cost of zero.
332
    For FunctionExpression, it checks the catalog for relevant statistics; if none are
333
    available, it uses a default cost of DEFAULT_FUNCTION_EXPRESSION_COST.
334

335
    Args:
336
        context (OptimizerContext): the associated optimizer context
337
        expr (AbstractExpression): The AbstractExpression object whose cost
338
        needs to be computed.
339

340
    Returns:
341
        float: The estimated cost of executing the function expression.
342
    """
343
    total_cost = 0
1✔
344
    # iterate over all the function expression and accumulate the cost
345
    for child_expr in expr.find_all(FunctionExpression):
1✔
346
        cost_entry = context.db.catalog().get_function_cost_catalog_entry(
1✔
347
            child_expr.name
348
        )
349
        if cost_entry:
×
350
            total_cost += cost_entry.cost
1✔
351
        else:
352
            total_cost += DEFAULT_FUNCTION_EXPRESSION_COST
×
353
    return total_cost
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc