• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

georgia-tech-db / eva / 20a9a0f9-edcc-437c-815d-bcc1a2d22b17

10 Nov 2023 04:50AM UTC coverage: 66.644% (-10.2%) from 76.812%
20a9a0f9-edcc-437c-815d-bcc1a2d22b17

push

circleci

americast
update docs

0 of 1 new or added line in 1 file covered. (0.0%)

1354 existing lines in 113 files now uncovered.

8767 of 13155 relevant lines covered (66.64%)

0.67 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

53.85
/evadb/optimizer/optimizer_utils.py
1
# coding=utf-8
2
# Copyright 2018-2023 EvaDB
3
#
4
# Licensed under the Apache License, Version 2.0 (the "License");
5
# you may not use this file except in compliance with the License.
6
# You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
import typing
1✔
16
from typing import List, Tuple
1✔
17

18
if typing.TYPE_CHECKING:
19
    from evadb.optimizer.optimizer_context import OptimizerContext
20

21
from evadb.catalog.catalog_utils import get_table_primary_columns
1✔
22
from evadb.catalog.models.column_catalog import ColumnCatalogEntry
1✔
23
from evadb.catalog.models.function_io_catalog import FunctionIOCatalogEntry
1✔
24
from evadb.catalog.models.function_metadata_catalog import FunctionMetadataCatalogEntry
1✔
25
from evadb.constants import CACHEABLE_FUNCTIONS, DEFAULT_FUNCTION_EXPRESSION_COST
1✔
26
from evadb.expression.abstract_expression import AbstractExpression, ExpressionType
1✔
27
from evadb.expression.constant_value_expression import ConstantValueExpression
1✔
28
from evadb.expression.expression_utils import (
1✔
29
    conjunction_list_to_expression_tree,
30
    contains_single_column,
31
    get_columns_in_predicate,
32
    is_simple_predicate,
33
    to_conjunction_list,
34
)
35
from evadb.expression.function_expression import (
1✔
36
    FunctionExpression,
37
    FunctionExpressionCache,
38
)
39
from evadb.expression.tuple_value_expression import TupleValueExpression
1✔
40
from evadb.parser.alias import Alias
1✔
41
from evadb.parser.create_statement import ColumnDefinition
1✔
42
from evadb.utils.kv_cache import DiskKVCache
1✔
43

44

45
def column_definition_to_function_io(col_list: List[ColumnDefinition], is_input: bool):
1✔
46
    """Create the FunctionIOCatalogEntry object for each column definition provided
47

48
    Arguments:
49
        col_list(List[ColumnDefinition]): parsed input/output definitions
50
        is_input(bool): true if input else false
51
    """
52
    if isinstance(col_list, ColumnDefinition):
1✔
UNCOV
53
        col_list = [col_list]
×
54

55
    result_list = []
1✔
56
    for col in col_list:
1✔
57
        assert col is not None, "Empty column definition while creating function io"
1✔
58
        result_list.append(
1✔
59
            FunctionIOCatalogEntry(
60
                col.name,
61
                col.type,
62
                col.cci.nullable,
63
                array_type=col.array_type,
64
                array_dimensions=col.dimension,
65
                is_input=is_input,
66
            )
67
        )
68
    return result_list
1✔
69

70

71
def metadata_definition_to_function_metadata(metadata_list: List[Tuple[str, str]]):
1✔
72
    """Create the FunctionMetadataCatalogEntry object for each metadata definition provided
73

74
    Arguments:
75
        col_list(List[Tuple[str, str]]): parsed metadata definitions
76
    """
77
    result_list = []
1✔
78
    for metadata in metadata_list:
1✔
79
        result_list.append(
1✔
80
            FunctionMetadataCatalogEntry(
81
                metadata[0],
82
                metadata[1],
83
            )
84
        )
85
    return result_list
1✔
86

87

88
def extract_equi_join_keys(
1✔
89
    join_predicate: AbstractExpression,
90
    left_table_aliases: List[Alias],
91
    right_table_aliases: List[Alias],
92
) -> Tuple[List[AbstractExpression], List[AbstractExpression]]:
93
    pred_list = to_conjunction_list(join_predicate)
1✔
94
    left_join_keys = []
1✔
95
    right_join_keys = []
1✔
96
    left_table_alias_strs = [
1✔
97
        left_table_alias.alias_name for left_table_alias in left_table_aliases
98
    ]
99
    right_table_alias_strs = [
1✔
100
        right_table_alias.alias_name for right_table_alias in right_table_aliases
101
    ]
102

103
    for pred in pred_list:
1✔
104
        if pred.etype == ExpressionType.COMPARE_EQUAL:
1✔
105
            left_child = pred.children[0]
1✔
106
            right_child = pred.children[1]
1✔
107
            # only extract if both are TupleValueExpression
108
            if (
1✔
109
                left_child.etype == ExpressionType.TUPLE_VALUE
110
                and right_child.etype == ExpressionType.TUPLE_VALUE
111
            ):
112
                if (
1✔
113
                    left_child.table_alias in left_table_alias_strs
114
                    and right_child.table_alias in right_table_alias_strs
115
                ):
116
                    left_join_keys.append(left_child)
1✔
117
                    right_join_keys.append(right_child)
1✔
118
                elif (
1✔
119
                    left_child.table_alias in right_table_alias_strs
120
                    and right_child.table_alias in left_table_alias_strs
121
                ):
122
                    left_join_keys.append(right_child)
1✔
123
                    right_join_keys.append(left_child)
1✔
124

125
    return (left_join_keys, right_join_keys)
1✔
126

127

128
def extract_pushdown_predicate(
1✔
129
    predicate: AbstractExpression, column_alias: str
130
) -> Tuple[AbstractExpression, AbstractExpression]:
131
    """Decompose the predicate into pushdown predicate and remaining predicate
132

133
    Args:
134
        predicate (AbstractExpression): predicate that needs to be decomposed
135
        column (str): column_alias to extract predicate
136
    Returns:
137
        Tuple[AbstractExpression, AbstractExpression]: (pushdown predicate,
138
        remaining predicate)
139
    """
140
    if predicate is None:
1✔
141
        return None, None
×
142

143
    if contains_single_column(predicate, column_alias):
1✔
144
        if is_simple_predicate(predicate):
1✔
145
            return predicate, None
1✔
146

UNCOV
147
    pushdown_preds = []
×
UNCOV
148
    rem_pred = []
×
UNCOV
149
    pred_list = to_conjunction_list(predicate)
×
UNCOV
150
    for pred in pred_list:
×
UNCOV
151
        if contains_single_column(pred, column_alias) and is_simple_predicate(pred):
×
152
            pushdown_preds.append(pred)
×
153
        else:
UNCOV
154
            rem_pred.append(pred)
×
155

UNCOV
156
    return (
×
157
        conjunction_list_to_expression_tree(pushdown_preds),
158
        conjunction_list_to_expression_tree(rem_pred),
159
    )
160

161

162
def extract_pushdown_predicate_for_alias(
1✔
163
    predicate: AbstractExpression, aliases: List[Alias]
164
):
165
    """Extract predicate that can be pushed down based on the input aliases.
166

167
    Atomic predicates on the table columns that are the subset of the input aliases are
168
    considered as candidates for pushdown.
169

170
    Args:
171
        predicate (AbstractExpression): input predicate
172
        aliases (List[str]): aliases for which predicate can be pushed
173
    """
174
    if predicate is None:
1✔
175
        return None, None
×
176

177
    pred_list = to_conjunction_list(predicate)
1✔
178
    pushdown_preds = []
1✔
179
    rem_pred = []
1✔
180
    aliases = [alias.alias_name for alias in aliases]
1✔
181
    for pred in pred_list:
1✔
182
        column_aliases = get_columns_in_predicate(pred)
1✔
183
        table_aliases = set([col.split(".")[0] for col in column_aliases])
1✔
184
        if table_aliases.issubset(set(aliases)):
1✔
185
            pushdown_preds.append(pred)
1✔
186
        else:
187
            rem_pred.append(pred)
×
188
    return (
1✔
189
        conjunction_list_to_expression_tree(pushdown_preds),
190
        conjunction_list_to_expression_tree(rem_pred),
191
    )
192

193

194
def optimize_cache_key_for_tuple_value_expression(
1✔
195
    context: "OptimizerContext", tv_expr: TupleValueExpression
196
):
197
    catalog = context.db.catalog()
×
198
    col_catalog_obj = tv_expr.col_object
×
199

200
    # Optimized cache key for TupleValueExpression.
201
    new_keys = []
×
202

203
    if isinstance(col_catalog_obj, ColumnCatalogEntry):
×
204
        table_obj = catalog.get_table_catalog_entry(col_catalog_obj.table_name)
×
205
        for col in get_table_primary_columns(table_obj):
×
206
            new_obj = catalog.get_column_catalog_entry(table_obj, col.name)
×
207
            new_keys.append(
×
208
                TupleValueExpression(
209
                    name=col.name,
210
                    table_alias=tv_expr.table_alias,
211
                    col_object=new_obj,
212
                    col_alias=f"{tv_expr.table_alias}.{col.name}",
213
                )
214
            )
215
        return new_keys
×
216

217
    return [tv_expr]
×
218

219

220
def optimize_cache_key_for_constant_value_expression(
1✔
221
    context: "OptimizerContext", cv_expr: ConstantValueExpression
222
):
223
    # No need to additional optimization for constant value expression.
224
    return [cv_expr]
×
225

226

227
def optimize_cache_key(context: "OptimizerContext", expr: FunctionExpression):
1✔
228
    """Optimize the cache key
229

230
    It tries to reduce the caching overhead by replacing the caching key with
231
    logically equivalent key. For instance, frame data can be replaced with frame id.
232

233
    Args:
234
        expr (FunctionExpression): expression to optimize the caching key for.
235

236
    Example:
237
        Yolo(data) -> return id
238

239
    Todo: Optimize complex expression
240
        FaceDet(Crop(data, bbox)) -> return
241

242
    """
243
    keys = expr.children
×
244

245
    optimize_key_mapping_f = {
×
246
        TupleValueExpression: optimize_cache_key_for_tuple_value_expression,
247
        ConstantValueExpression: optimize_cache_key_for_constant_value_expression,
248
    }
249

250
    optimized_keys = []
×
251
    for key in keys:
×
252
        if type(key) not in optimize_key_mapping_f:
×
253
            raise RuntimeError(f"Optimize cache key of {type(key)} is not implemented")
254
        optimized_keys += optimize_key_mapping_f[type(key)](context, key)
×
255

256
    return optimized_keys
×
257

258

259
def enable_cache_init(
1✔
260
    context: "OptimizerContext", func_expr: FunctionExpression
261
) -> FunctionExpressionCache:
262
    optimized_key = optimize_cache_key(context, func_expr)
×
263
    if optimized_key == func_expr.children:
×
264
        optimized_key = [None]
×
265

266
    catalog = context.db.catalog()
×
267
    name = func_expr.signature()
×
268
    cache_entry = catalog.get_function_cache_catalog_entry_by_name(name)
×
269
    if not cache_entry:
×
270
        cache_entry = catalog.insert_function_cache_catalog_entry(func_expr)
×
271

272
    cache = FunctionExpressionCache(
×
273
        key=tuple(optimized_key), store=DiskKVCache(cache_entry.cache_path)
274
    )
275
    return cache
×
276

277

278
def enable_cache(
1✔
279
    context: "OptimizerContext", func_expr: FunctionExpression
280
) -> FunctionExpression:
281
    """Enables cache for a function expression.
282

283
    The cache key is optimized by replacing it with logical equivalent expressions.
284
    A cache entry is inserted in the catalog corresponding to the expression.
285

286
    Args:
287
        context (OptimizerContext): associated optimizer context
288
        func_expr (FunctionExpression): The function expression to enable cache for.
289

290
    Returns:
291
        FunctionExpression: The function expression with cache enabled.
292
    """
293
    cache = enable_cache_init(context, func_expr)
×
294
    return func_expr.copy().enable_cache(cache)
×
295

296

297
def enable_cache_on_expression_tree(
1✔
298
    context: "OptimizerContext", expr_tree: AbstractExpression
299
):
300
    func_exprs = list(expr_tree.find_all(FunctionExpression))
×
301
    func_exprs = list(
×
302
        filter(lambda expr: check_expr_validity_for_cache(expr), func_exprs)
303
    )
304
    for expr in func_exprs:
×
305
        cache = enable_cache_init(context, expr)
×
306
        expr.enable_cache(cache)
×
307

308

309
def check_expr_validity_for_cache(expr: FunctionExpression):
1✔
UNCOV
310
    valid = expr.name in CACHEABLE_FUNCTIONS and not expr.has_cache()
×
UNCOV
311
    if len(expr.children) == 1:
×
312
        # Normal function that only takes one parameter.
UNCOV
313
        valid &= isinstance(expr.children[0], TupleValueExpression)
×
314
    elif len(expr.children) == 2:
×
315
        # LLM-based function that takes two parameters.
316
        valid &= isinstance(expr.children[0], ConstantValueExpression) and isinstance(
×
317
            expr.children[1], TupleValueExpression
318
        )
UNCOV
319
    return valid
×
320

321

322
def get_expression_execution_cost(
1✔
323
    context: "OptimizerContext", expr: AbstractExpression
324
) -> float:
325
    """
326
    This function computes the estimated cost of executing the given abstract expression
327
    based on the statistics in the catalog. The function assumes that all the
328
    expression, except for the FunctionExpression, have a cost of zero.
329
    For FunctionExpression, it checks the catalog for relevant statistics; if none are
330
    available, it uses a default cost of DEFAULT_FUNCTION_EXPRESSION_COST.
331

332
    Args:
333
        context (OptimizerContext): the associated optimizer context
334
        expr (AbstractExpression): The AbstractExpression object whose cost
335
        needs to be computed.
336

337
    Returns:
338
        float: The estimated cost of executing the function expression.
339
    """
UNCOV
340
    total_cost = 0
×
341
    # iterate over all the function expression and accumulate the cost
UNCOV
342
    for child_expr in expr.find_all(FunctionExpression):
×
UNCOV
343
        cost_entry = context.db.catalog().get_function_cost_catalog_entry(
×
344
            child_expr.name
345
        )
UNCOV
346
        if cost_entry:
×
347
            total_cost += cost_entry.cost
×
348
        else:
UNCOV
349
            total_cost += DEFAULT_FUNCTION_EXPRESSION_COST
×
UNCOV
350
    return total_cost
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc