• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

georgia-tech-db / eva / #811

25 Sep 2023 03:38AM UTC coverage: 92.737% (-0.1%) from 92.866%
#811

push

circle-ci

Jiashen Cao
make code more modular

11390 of 12282 relevant lines covered (92.74%)

0.93 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.69
/evadb/optimizer/optimizer_utils.py
1
# coding=utf-8
2
# Copyright 2018-2023 EvaDB
3
#
4
# Licensed under the Apache License, Version 2.0 (the "License");
5
# you may not use this file except in compliance with the License.
6
# You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
import typing
1✔
16
from typing import List, Tuple
1✔
17

18
if typing.TYPE_CHECKING:
19
    from evadb.optimizer.optimizer_context import OptimizerContext
20

21
from evadb.catalog.catalog_utils import get_table_primary_columns
1✔
22
from evadb.catalog.models.column_catalog import ColumnCatalogEntry
1✔
23
from evadb.catalog.models.function_io_catalog import FunctionIOCatalogEntry
1✔
24
from evadb.catalog.models.function_metadata_catalog import FunctionMetadataCatalogEntry
1✔
25
from evadb.constants import CACHEABLE_FUNCTIONS, DEFAULT_FUNCTION_EXPRESSION_COST
1✔
26
from evadb.expression.abstract_expression import AbstractExpression, ExpressionType
1✔
27
from evadb.expression.constant_value_expression import ConstantValueExpression
1✔
28
from evadb.expression.expression_utils import (
1✔
29
    conjunction_list_to_expression_tree,
30
    contains_single_column,
31
    get_columns_in_predicate,
32
    is_simple_predicate,
33
    to_conjunction_list,
34
)
35
from evadb.expression.function_expression import (
1✔
36
    FunctionExpression,
37
    FunctionExpressionCache,
38
)
39
from evadb.expression.tuple_value_expression import TupleValueExpression
1✔
40
from evadb.parser.alias import Alias
1✔
41
from evadb.parser.create_statement import ColumnDefinition
1✔
42
from evadb.utils.kv_cache import DiskKVCache
1✔
43

44

45
def column_definition_to_function_io(col_list: List[ColumnDefinition], is_input: bool):
1✔
46
    """Create the FunctionIOCatalogEntry object for each column definition provided
47

48
    Arguments:
49
        col_list(List[ColumnDefinition]): parsed input/output definitions
50
        is_input(bool): true if input else false
51
    """
52
    if isinstance(col_list, ColumnDefinition):
1✔
53
        col_list = [col_list]
1✔
54

55
    result_list = []
1✔
56
    for col in col_list:
1✔
57
        assert col is not None, "Empty column definition while creating function io"
1✔
58
        result_list.append(
1✔
59
            FunctionIOCatalogEntry(
60
                col.name,
61
                col.type,
62
                col.cci.nullable,
63
                array_type=col.array_type,
64
                array_dimensions=col.dimension,
65
                is_input=is_input,
66
            )
67
        )
68
    return result_list
1✔
69

70

71
def metadata_definition_to_function_metadata(metadata_list: List[Tuple[str, str]]):
1✔
72
    """Create the FunctionMetadataCatalogEntry object for each metadata definition provided
73

74
    Arguments:
75
        col_list(List[Tuple[str, str]]): parsed metadata definitions
76
    """
77
    result_list = []
1✔
78
    for metadata in metadata_list:
1✔
79
        result_list.append(
1✔
80
            FunctionMetadataCatalogEntry(
81
                metadata[0],
82
                metadata[1],
83
            )
84
        )
85
    return result_list
1✔
86

87

88
def extract_equi_join_keys(
1✔
89
    join_predicate: AbstractExpression,
90
    left_table_aliases: List[Alias],
91
    right_table_aliases: List[Alias],
92
) -> Tuple[List[AbstractExpression], List[AbstractExpression]]:
93
    pred_list = to_conjunction_list(join_predicate)
1✔
94
    left_join_keys = []
1✔
95
    right_join_keys = []
1✔
96
    left_table_alias_strs = [
1✔
97
        left_table_alias.alias_name for left_table_alias in left_table_aliases
98
    ]
99
    right_table_alias_strs = [
1✔
100
        right_table_alias.alias_name for right_table_alias in right_table_aliases
101
    ]
102

103
    for pred in pred_list:
1✔
104
        if pred.etype == ExpressionType.COMPARE_EQUAL:
1✔
105
            left_child = pred.children[0]
1✔
106
            right_child = pred.children[1]
1✔
107
            # only extract if both are TupleValueExpression
108
            if (
1✔
109
                left_child.etype == ExpressionType.TUPLE_VALUE
110
                and right_child.etype == ExpressionType.TUPLE_VALUE
111
            ):
112
                if (
1✔
113
                    left_child.table_alias in left_table_alias_strs
114
                    and right_child.table_alias in right_table_alias_strs
115
                ):
116
                    left_join_keys.append(left_child)
1✔
117
                    right_join_keys.append(right_child)
1✔
118
                elif (
1✔
119
                    left_child.table_alias in right_table_alias_strs
120
                    and right_child.table_alias in left_table_alias_strs
121
                ):
122
                    left_join_keys.append(right_child)
1✔
123
                    right_join_keys.append(left_child)
1✔
124

125
    return (left_join_keys, right_join_keys)
1✔
126

127

128
def extract_pushdown_predicate(
1✔
129
    predicate: AbstractExpression, column_alias: str
130
) -> Tuple[AbstractExpression, AbstractExpression]:
131
    """Decompose the predicate into pushdown predicate and remaining predicate
132

133
    Args:
134
        predicate (AbstractExpression): predicate that needs to be decomposed
135
        column (str): column_alias to extract predicate
136
    Returns:
137
        Tuple[AbstractExpression, AbstractExpression]: (pushdown predicate,
138
        remaining predicate)
139
    """
140
    if predicate is None:
1✔
141
        return None, None
×
142

143
    if contains_single_column(predicate, column_alias):
1✔
144
        if is_simple_predicate(predicate):
1✔
145
            return predicate, None
1✔
146

147
    pushdown_preds = []
1✔
148
    rem_pred = []
1✔
149
    pred_list = to_conjunction_list(predicate)
1✔
150
    for pred in pred_list:
1✔
151
        if contains_single_column(pred, column_alias) and is_simple_predicate(pred):
1✔
152
            pushdown_preds.append(pred)
1✔
153
        else:
154
            rem_pred.append(pred)
1✔
155

156
    return (
1✔
157
        conjunction_list_to_expression_tree(pushdown_preds),
158
        conjunction_list_to_expression_tree(rem_pred),
159
    )
160

161

162
def extract_pushdown_predicate_for_alias(
1✔
163
    predicate: AbstractExpression, aliases: List[Alias]
164
):
165
    """Extract predicate that can be pushed down based on the input aliases.
166

167
    Atomic predicates on the table columns that are the subset of the input aliases are
168
    considered as candidates for pushdown.
169

170
    Args:
171
        predicate (AbstractExpression): input predicate
172
        aliases (List[str]): aliases for which predicate can be pushed
173
    """
174
    if predicate is None:
1✔
175
        return None, None
1✔
176

177
    pred_list = to_conjunction_list(predicate)
1✔
178
    pushdown_preds = []
1✔
179
    rem_pred = []
1✔
180
    aliases = [alias.alias_name for alias in aliases]
1✔
181
    for pred in pred_list:
1✔
182
        column_aliases = get_columns_in_predicate(pred)
1✔
183
        table_aliases = set([col.split(".")[0] for col in column_aliases])
1✔
184
        if table_aliases.issubset(set(aliases)):
1✔
185
            pushdown_preds.append(pred)
1✔
186
        else:
187
            rem_pred.append(pred)
1✔
188
    return (
1✔
189
        conjunction_list_to_expression_tree(pushdown_preds),
190
        conjunction_list_to_expression_tree(rem_pred),
191
    )
192

193

194
def optimize_cache_key_for_tuple_value_expression(
1✔
195
    context: "OptimizerContext", tv_expr: TupleValueExpression
196
):
197
    catalog = context.db.catalog()
1✔
198
    col_catalog_obj = tv_expr.col_object
1✔
199

200
    # Optimized cache key for TupleValueExpression.
201
    new_keys = []
1✔
202

203
    if isinstance(col_catalog_obj, ColumnCatalogEntry):
1✔
204
        table_obj = catalog.get_table_catalog_entry(col_catalog_obj.table_name)
1✔
205
        for col in get_table_primary_columns(table_obj):
1✔
206
            new_obj = catalog.get_column_catalog_entry(table_obj, col.name)
1✔
207
            new_keys.append(
1✔
208
                TupleValueExpression(
209
                    name=col.name,
210
                    table_alias=tv_expr.table_alias,
211
                    col_object=new_obj,
212
                    col_alias=f"{tv_expr.table_alias}.{col.name}",
213
                )
214
            )
215
        return new_keys
1✔
216

217
    return [tv_expr]
×
218

219

220
def optimize_cache_key_for_constant_value_expression(
1✔
221
    context: "OptimizerContext", cv_expr: ConstantValueExpression
222
):
223
    # No need to additional optimization for constant value expression.
224
    return [cv_expr]
1✔
225

226

227
def optimize_cache_key(context: "OptimizerContext", expr: FunctionExpression):
1✔
228
    """Optimize the cache key
229

230
    It tries to reduce the caching overhead by replacing the caching key with
231
    logically equivalent key. For instance, frame data can be replaced with frame id.
232

233
    Args:
234
        expr (FunctionExpression): expression to optimize the caching key for.
235

236
    Example:
237
        Yolo(data) -> return id
238

239
    Todo: Optimize complex expression
240
        FaceDet(Crop(data, bbox)) -> return
241

242
    """
243
    keys = expr.children
1✔
244

245
    optimize_key_mapping_f = {
1✔
246
        TupleValueExpression: optimize_cache_key_for_tuple_value_expression,
247
        ConstantValueExpression: optimize_cache_key_for_constant_value_expression,
248
    }
249

250
    for i, key in enumerate(keys):
1✔
251
        if type(key) not in optimize_key_mapping_f:
1✔
252
            raise RuntimeError(f"Optimize cache key of {type(key)} is not implemented")
1✔
253
        keys[i] = optimize_key_mapping_f[type(key)](context, key)
254

1✔
255
    return keys
256

1✔
257

258
def enable_cache_init(
259
    context: "OptimizerContext", func_expr: FunctionExpression
1✔
260
) -> FunctionExpressionCache:
261
    optimized_key = optimize_cache_key(context, func_expr)
262
    if optimized_key == func_expr.children:
1✔
263
        optimized_key = [None]
1✔
264

×
265
    catalog = context.db.catalog()
266
    name = func_expr.signature()
1✔
267
    cache_entry = catalog.get_function_cache_catalog_entry_by_name(name)
1✔
268
    if not cache_entry:
1✔
269
        cache_entry = catalog.insert_function_cache_catalog_entry(func_expr)
1✔
270

1✔
271
    cache = FunctionExpressionCache(
272
        key=tuple(optimized_key), store=DiskKVCache(cache_entry.cache_path)
1✔
273
    )
274
    return cache
275

1✔
276

277
def enable_cache(
278
    context: "OptimizerContext", func_expr: FunctionExpression
1✔
279
) -> FunctionExpression:
280
    """Enables cache for a function expression.
281

282
    The cache key is optimized by replacing it with logical equivalent expressions.
283
    A cache entry is inserted in the catalog corresponding to the expression.
284

285
    Args:
286
        context (OptimizerContext): associated optimizer context
287
        func_expr (FunctionExpression): The function expression to enable cache for.
288

289
    Returns:
290
        FunctionExpression: The function expression with cache enabled.
291
    """
292
    cache = enable_cache_init(context, func_expr)
293
    return func_expr.copy().enable_cache(cache)
1✔
294

1✔
295

296
def enable_cache_on_expression_tree(
297
    context: "OptimizerContext", expr_tree: AbstractExpression
1✔
298
):
299
    func_exprs = list(expr_tree.find_all(FunctionExpression))
300
    func_exprs = list(
1✔
301
        filter(lambda expr: check_expr_validity_for_cache(expr), func_exprs)
1✔
302
    )
303
    for expr in func_exprs:
304
        cache = enable_cache_init(context, expr)
1✔
305
        expr.enable_cache(cache)
1✔
306

1✔
307

308
def check_expr_validity_for_cache(expr: FunctionExpression):
309
    valid = expr.name in CACHEABLE_FUNCTIONS and not expr.has_cache()
1✔
310
    if len(expr.children) == 1:
1✔
311
        # Normal function that only takes one parameter.
1✔
312
        valid &= isinstance(expr.children[0], TupleValueExpression)
313
    elif len(expr.children) == 2:
1✔
314
        # LLM-based function that takes two parameters.
1✔
315
        valid &= isinstance(expr.children[0], ConstantValueExpression) and isinstance(
316
            expr.children[1], TupleValueExpression
1✔
317
        )
318
    return valid
319

1✔
320

321
def get_expression_execution_cost(
322
    context: "OptimizerContext", expr: AbstractExpression
1✔
323
) -> float:
324
    """
325
    This function computes the estimated cost of executing the given abstract expression
326
    based on the statistics in the catalog. The function assumes that all the
327
    expression, except for the FunctionExpression, have a cost of zero.
328
    For FunctionExpression, it checks the catalog for relevant statistics; if none are
329
    available, it uses a default cost of DEFAULT_FUNCTION_EXPRESSION_COST.
330

331
    Args:
332
        context (OptimizerContext): the associated optimizer context
333
        expr (AbstractExpression): The AbstractExpression object whose cost
334
        needs to be computed.
335

336
    Returns:
337
        float: The estimated cost of executing the function expression.
338
    """
339
    total_cost = 0
340
    # iterate over all the function expression and accumulate the cost
1✔
341
    for child_expr in expr.find_all(FunctionExpression):
342
        cost_entry = context.db.catalog().get_function_cost_catalog_entry(
1✔
343
            child_expr.name
1✔
344
        )
345
        if cost_entry:
346
            total_cost += cost_entry.cost
1✔
347
        else:
1✔
348
            total_cost += DEFAULT_FUNCTION_EXPRESSION_COST
349
    return total_cost
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc