• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

georgia-tech-db / eva / #758

04 Sep 2023 08:37PM UTC coverage: 0.0% (-78.3%) from 78.333%
#758

push

circle-ci

hershd23
Increased underline length in at line 75 in text_summarization.rst
	modified:   docs/source/benchmarks/text_summarization.rst

0 of 11303 relevant lines covered (0.0%)

0.0 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/evadb/optimizer/optimizer_tasks.py
1
# coding=utf-8
2
# Copyright 2018-2023 EvaDB
3
#
4
# Licensed under the Apache License, Version 2.0 (the "License");
5
# you may not use this file except in compliance with the License.
6
# You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
from __future__ import annotations
×
16

17
from abc import abstractmethod
×
18
from enum import IntEnum, auto
×
19
from typing import TYPE_CHECKING, List
×
20

21
from evadb.optimizer.binder import Binder
×
22
from evadb.optimizer.group import Group
×
23
from evadb.optimizer.group_expression import GroupExpression
×
24
from evadb.optimizer.property import PropertyType
×
25
from evadb.optimizer.rules.rules_base import Rule
×
26
from evadb.utils.logging_manager import logger
×
27

28
if TYPE_CHECKING:
29
    from evadb.optimizer.optimizer_context import OptimizerContext
30

31

32
class OptimizerTaskType(IntEnum):
×
33
    """Manages Enum for all the supported optimizer tasks"""
34

35
    TOP_DOWN_REWRITE = auto()
×
36
    BOTTOM_UP_REWRITE = auto()
×
37
    OPTIMIZE_EXPRESSION = auto()
×
38
    OPTIMIZE_GROUP = auto()
×
39
    OPTIMIZE_INPUTS = auto()
×
40
    APPLY_RULE = auto()
×
41
    EXPLORE_GROUP = auto()
×
42

43

44
class OptimizerTask:
×
45
    def __init__(
×
46
        self, optimizer_context: OptimizerContext, task_type: OptimizerTaskType
47
    ):
48
        self._task_type = task_type
×
49
        self._optimizer_context = optimizer_context
×
50

51
    @property
×
52
    def optimizer_context(self):
×
53
        return self._optimizer_context
×
54

55
    @abstractmethod
×
56
    def execute(self):
×
57
        raise NotImplementedError
58

59

60
class TopDownRewrite(OptimizerTask):
×
61
    def __init__(
×
62
        self,
63
        root_expr: GroupExpression,
64
        rule_set: List[Rule],
65
        optimizer_context: OptimizerContext,
66
    ):
67
        self.root_expr = root_expr
×
68
        self.rule_set = rule_set
×
69
        super().__init__(optimizer_context, OptimizerTaskType.TOP_DOWN_REWRITE)
×
70

71
    def execute(self):
×
72
        valid_rules = []
×
73
        for rule in self.rule_set:
×
74
            if not self.root_expr.is_rule_explored(rule.rule_type) and rule.top_match(
×
75
                self.root_expr.opr
76
            ):
77
                valid_rules.append(rule)
×
78

79
        # sort the rules by promise
80
        valid_rules = sorted(valid_rules, key=lambda x: x.promise())
×
81
        for rule in valid_rules:
×
82
            binder = Binder(self.root_expr, rule.pattern, self.optimizer_context.memo)
×
83
            for match in iter(binder):
×
84
                if not rule.check(match, self.optimizer_context):
×
85
                    continue
×
86
                after = rule.apply(match, self.optimizer_context)
×
87
                plans = list(after)
×
88
                assert (
×
89
                    len(plans) <= 1
90
                ), "Rewrite rule cannot generate more than open alternate plan."
91
                for plan in plans:
×
92
                    new_expr = self.optimizer_context.replace_expression(
×
93
                        plan, self.root_expr.group_id
94
                    )
95
                    self.optimizer_context.task_stack.push(
×
96
                        TopDownRewrite(new_expr, self.rule_set, self.optimizer_context)
97
                    )
98
                    # The root has changed so we cannot apply more rules to the same
99
                    # root, hence return
100
                    return
×
101

102
                self.root_expr.mark_rule_explored(rule.rule_type)
×
103
        for child in self.root_expr.children:
×
104
            child_expr = self.optimizer_context.memo.groups[child].logical_exprs[0]
×
105
            self.optimizer_context.task_stack.push(
×
106
                TopDownRewrite(child_expr, self.rule_set, self.optimizer_context)
107
            )
108

109

110
class BottomUpRewrite(OptimizerTask):
×
111
    def __init__(
×
112
        self,
113
        root_expr: GroupExpression,
114
        rule_set: List[Rule],
115
        optimizer_context: OptimizerContext,
116
        children_explored=False,
117
    ):
118
        super().__init__(optimizer_context, OptimizerTaskType.BOTTOM_UP_REWRITE)
×
119
        self._children_explored = children_explored
×
120
        self.root_expr = root_expr
×
121
        self.rule_set = rule_set
×
122

123
    def execute(self):
×
124
        if not self._children_explored:
×
125
            self.optimizer_context.task_stack.push(
×
126
                BottomUpRewrite(
127
                    self.root_expr, self.rule_set, self.optimizer_context, True
128
                )
129
            )
130
            for child in self.root_expr.children:
×
131
                child_expr = self.optimizer_context.memo.groups[child].logical_exprs[0]
×
132
                self.optimizer_context.task_stack.push(
×
133
                    BottomUpRewrite(child_expr, self.rule_set, self.optimizer_context)
134
                )
135
            return
×
136
        valid_rules = []
×
137
        for rule in self.rule_set:
×
138
            if not self.root_expr.is_rule_explored(rule.rule_type) and rule.top_match(
×
139
                self.root_expr.opr
140
            ):
141
                valid_rules.append(rule)
×
142

143
        # sort the rules by promise
144
        sorted(valid_rules, key=lambda x: x.promise())
×
145
        for rule in valid_rules:
×
146
            binder = Binder(self.root_expr, rule.pattern, self.optimizer_context.memo)
×
147
            for match in iter(binder):
×
148
                if not rule.check(match, self.optimizer_context):
×
149
                    continue
×
150
                logger.info(
×
151
                    "In BottomUp, Rule {} matched for {}".format(rule, self.root_expr)
152
                )
153
                after = rule.apply(match, self.optimizer_context)
×
154
                plans = list(after)
×
155
                assert (
×
156
                    len(plans) <= 1
157
                ), "Rewrite rule cannot generate more than open alternate plan."
158
                for plan in plans:
×
159
                    new_expr = self.optimizer_context.replace_expression(
×
160
                        plan, self.root_expr.group_id
161
                    )
162
                    logger.info("After rewriting {}".format(self.root_expr))
×
163
                    self.optimizer_context.task_stack.push(
×
164
                        BottomUpRewrite(new_expr, self.rule_set, self.optimizer_context)
165
                    )
166
                    # The root has changed so we cannot apply more rules to the same
167
                    # root, hence return
168
                    return
×
169
            self.root_expr.mark_rule_explored(rule.rule_type)
×
170

171

172
class OptimizeExpression(OptimizerTask):
×
173
    def __init__(
×
174
        self,
175
        root_expr: GroupExpression,
176
        optimizer_context: OptimizerContext,
177
        explore: bool,
178
    ):
179
        self.root_expr = root_expr
×
180
        self.explore = explore
×
181
        super().__init__(optimizer_context, OptimizerTaskType.OPTIMIZE_EXPRESSION)
×
182

183
    def execute(self):
×
184
        rules_manager = self.optimizer_context.rules_manager
×
185
        rules = rules_manager.logical_rules
×
186
        # if exploring, we don't need to consider implementation rules
187
        if not self.explore:
×
188
            # rules.extend(rules_manager.implementation_rules)
189
            # Original code commented out here. It falsely modifies the internal logical
190
            # rule list.
191
            rules = rules_manager.logical_rules + rules_manager.implementation_rules
×
192

193
        valid_rules = []
×
194
        for rule in rules:
×
195
            if rule.top_match(self.root_expr.opr):
×
196
                valid_rules.append(rule)
×
197

198
        valid_rules = sorted(valid_rules, key=lambda x: x.promise())
×
199

200
        for rule in valid_rules:
×
201
            # apply the rule
202
            self.optimizer_context.task_stack.push(
×
203
                ApplyRule(rule, self.root_expr, self.optimizer_context, self.explore)
204
            )
205

206
            # explore the input group if necessary
207
            # for idx, child in enumerate(rule.pattern.children):
208
            #    if len(child.children):
209
            #        child_grp_id = self.root_expr.children[idx]
210
            #        group = self.optimizer_context.memo.get_group_by_id(child_grp_id)
211
            #        self.optimizer_context.task_stack.push(
212
            #            ExploreGroup(group, self.optimizer_context)
213
            #        )
214

215

216
class ApplyRule(OptimizerTask):
×
217
    """apply a transformation or implementation rule"""
218

219
    def __init__(
×
220
        self,
221
        rule: Rule,
222
        root_expr: GroupExpression,
223
        optimizer_context: OptimizerContext,
224
        explore: bool,
225
    ):
226
        self.rule = rule
×
227
        self.root_expr = root_expr
×
228
        self.explore = explore
×
229
        super().__init__(optimizer_context, OptimizerTaskType.APPLY_RULE)
×
230

231
    def execute(self):
×
232
        # return if already explored
233
        if self.root_expr.is_rule_explored(self.rule.rule_type):
×
234
            return
×
235
        binder = Binder(self.root_expr, self.rule.pattern, self.optimizer_context.memo)
×
236
        for match in iter(binder):
×
237
            if not self.rule.check(match, self.optimizer_context):
×
238
                continue
×
239
            after = self.rule.apply(match, self.optimizer_context)
×
240
            for plan in after:
×
241
                new_expr = self.optimizer_context.add_opr_to_group(
×
242
                    plan, self.root_expr.group_id
243
                )
244

245
                if new_expr.is_logical():
×
246
                    # optimize expressions
247
                    self.optimizer_context.task_stack.push(
×
248
                        OptimizeExpression(
249
                            new_expr, self.optimizer_context, self.explore
250
                        )
251
                    )
252
                else:
253
                    # cost the physical expressions
254
                    self.optimizer_context.task_stack.push(
×
255
                        OptimizeInputs(new_expr, self.optimizer_context)
256
                    )
257

258
        self.root_expr.mark_rule_explored(self.rule.rule_type)
×
259

260

261
class OptimizeGroup(OptimizerTask):
×
262
    def __init__(self, group: Group, optimizer_context: OptimizerContext):
×
263
        self.group = group
×
264
        super().__init__(optimizer_context, OptimizerTaskType.OPTIMIZE_GROUP)
×
265

266
    def execute(self):
×
267
        # Todo: Get the property from the context
268
        if self.group.get_best_expr(PropertyType.DEFAULT):
×
269
            return
×
270

271
        # optimize all the logical exprs with the same context
272
        for expr in self.group.logical_exprs:
×
273
            self.optimizer_context.task_stack.push(
×
274
                OptimizeExpression(expr, self.optimizer_context, explore=False)
275
            )
276

277
        # cost all the physical exprs with the same context
278
        for expr in self.group.physical_exprs:
×
279
            self.optimizer_context.task_stack.push(
×
280
                OptimizeInputs(expr, self.optimizer_context)
281
            )
282

283

284
class OptimizeInputs(OptimizerTask):
×
285
    def __init__(self, root_expr: GroupExpression, optimizer_context: OptimizerContext):
×
286
        self.root_expr = root_expr
×
287
        super().__init__(optimizer_context, OptimizerTaskType.OPTIMIZE_INPUTS)
×
288

289
    def execute(self):
×
290
        cost = 0
×
291
        memo = self.optimizer_context.memo
×
292
        grp = memo.get_group_by_id(self.root_expr.group_id)
×
293
        for child_id in self.root_expr.children:
×
294
            child_grp = memo.get_group_by_id(child_id)
×
295
            if child_grp.get_best_expr(PropertyType.DEFAULT):
×
296
                # Note: May never get hit when using EvaDB on Ray
297
                cost += child_grp.get_best_expr_cost(PropertyType.DEFAULT)
×
298
            else:
299
                self.optimizer_context.task_stack.push(
×
300
                    OptimizeInputs(self.root_expr, self.optimizer_context)
301
                )
302
                self.optimizer_context.task_stack.push(
×
303
                    OptimizeGroup(child_grp, self.optimizer_context)
304
                )
305
                return
×
306

307
        cost += self.optimizer_context.cost_model.calculate_cost(self.root_expr)
×
308
        grp.add_expr_cost(self.root_expr, PropertyType.DEFAULT, cost)
×
309

310

311
# class ExploreGroup(OptimizerTask):
312
#     """
313
#     Derive all logical group-expression for matching a pattern
314
#     """
315

316
#     def __init__(self, group: Group, optimizer_context: OptimizerContext):
317
#         self.group = group
318
#         super().__init__(optimizer_context, OptimizerTaskType.EXPLORE_GROUP)
319

320
#     def execute(self):
321
#         # return if the group is already explored
322
#         if self.group.is_explored():
323
#             return
324

325
#         # explore all the logical expression
326
#         for expr in self.group.logical_exprs:
327
#             self.optimizer_context.task_stack.push(
328
#                 OptimizeExpression(expr, self.optimizer_context, explore=True)
329
#             )
330

331
#         # mark the group explored
332
#         self.group.mark_explored()
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc