• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

georgia-tech-db / eva / e4f99d26-a857-4f78-b9e8-041e4254b9e1

18 Aug 2023 05:49AM UTC coverage: 94.434% (-0.8%) from 95.277%
e4f99d26-a857-4f78-b9e8-041e4254b9e1

Pull #935

circle-ci

xzdandy
remove print
Pull Request #935: Ludwig-based model train and tune support.

92 of 92 new or added lines in 10 files covered. (100.0%)

10264 of 10869 relevant lines covered (94.43%)

1.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.02
/evadb/executor/plan_executor.py
1
# coding=utf-8
2
# Copyright 2018-2023 EvaDB
3
#
4
# Licensed under the Apache License, Version 2.0 (the "License");
5
# you may not use this file except in compliance with the License.
6
# You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
from typing import Iterator, Union
2✔
16

17
from evadb.database import EvaDBDatabase
2✔
18
from evadb.executor.abstract_executor import AbstractExecutor
2✔
19
from evadb.executor.apply_and_merge_executor import ApplyAndMergeExecutor
2✔
20
from evadb.executor.create_database_executor import CreateDatabaseExecutor
2✔
21
from evadb.executor.create_executor import CreateExecutor
2✔
22
from evadb.executor.create_index_executor import CreateIndexExecutor
2✔
23
from evadb.executor.create_udf_executor import CreateUDFExecutor
2✔
24
from evadb.executor.delete_executor import DeleteExecutor
2✔
25
from evadb.executor.drop_object_executor import DropObjectExecutor
2✔
26
from evadb.executor.exchange_executor import ExchangeExecutor
2✔
27
from evadb.executor.executor_utils import ExecutorError
2✔
28
from evadb.executor.explain_executor import ExplainExecutor
2✔
29
from evadb.executor.function_scan_executor import FunctionScanExecutor
2✔
30
from evadb.executor.groupby_executor import GroupByExecutor
2✔
31
from evadb.executor.hash_join_executor import HashJoinExecutor
2✔
32
from evadb.executor.insert_executor import InsertExecutor
2✔
33
from evadb.executor.join_build_executor import BuildJoinExecutor
2✔
34
from evadb.executor.lateral_join_executor import LateralJoinExecutor
2✔
35
from evadb.executor.limit_executor import LimitExecutor
2✔
36
from evadb.executor.load_executor import LoadDataExecutor
2✔
37
from evadb.executor.nested_loop_join_executor import NestedLoopJoinExecutor
2✔
38
from evadb.executor.orderby_executor import OrderByExecutor
2✔
39
from evadb.executor.pp_executor import PPExecutor
2✔
40
from evadb.executor.predicate_executor import PredicateExecutor
2✔
41
from evadb.executor.project_executor import ProjectExecutor
2✔
42
from evadb.executor.rename_executor import RenameExecutor
2✔
43
from evadb.executor.sample_executor import SampleExecutor
2✔
44
from evadb.executor.seq_scan_executor import SequentialScanExecutor
2✔
45
from evadb.executor.show_info_executor import ShowInfoExecutor
2✔
46
from evadb.executor.storage_executor import StorageExecutor
2✔
47
from evadb.executor.union_executor import UnionExecutor
2✔
48
from evadb.executor.vector_index_scan_executor import VectorIndexScanExecutor
2✔
49
from evadb.models.storage.batch import Batch
2✔
50
from evadb.parser.create_statement import CreateDatabaseStatement
2✔
51
from evadb.parser.statement import AbstractStatement
2✔
52
from evadb.plan_nodes.abstract_plan import AbstractPlan
2✔
53
from evadb.plan_nodes.types import PlanOprType
2✔
54
from evadb.utils.logging_manager import logger
2✔
55

56

57
class PlanExecutor:
2✔
58
    """
59
    This is an interface between plan tree and execution tree.
60
    We traverse the plan tree and build execution tree from it
61

62
    Arguments:
63
        plan (AbstractPlan): Physical plan tree which needs to be executed
64
        evadb (EvaDBDatabase): database to execute the query on
65
    """
66

67
    def __init__(self, evadb: EvaDBDatabase, plan: AbstractPlan):
2✔
68
        self._db = evadb
2✔
69
        self._plan = plan
2✔
70

71
    def _build_execution_tree(
2✔
72
        self, plan: Union[AbstractPlan, AbstractStatement]
73
    ) -> AbstractExecutor:
74
        """build the execution tree from plan tree
75

76
        Arguments:
77
            plan {AbstractPlan} -- Input Plan tree
78

79
        Returns:
80
            AbstractExecutor -- Compiled Execution tree
81
        """
82
        root = None
2✔
83
        if plan is None:
2✔
84
            return root
×
85

86
        # First handle cases when the plan is actually a parser statement
87
        if isinstance(plan, CreateDatabaseStatement):
2✔
88
            return CreateDatabaseExecutor(db=self._db, node=plan)
2✔
89

90
        # Get plan node type
91
        plan_opr_type = plan.opr_type
2✔
92

93
        if plan_opr_type == PlanOprType.SEQUENTIAL_SCAN:
2✔
94
            executor_node = SequentialScanExecutor(db=self._db, node=plan)
2✔
95
        elif plan_opr_type == PlanOprType.UNION:
2✔
96
            executor_node = UnionExecutor(db=self._db, node=plan)
2✔
97
        elif plan_opr_type == PlanOprType.STORAGE_PLAN:
2✔
98
            executor_node = StorageExecutor(db=self._db, node=plan)
2✔
99
        elif plan_opr_type == PlanOprType.PP_FILTER:
2✔
100
            executor_node = PPExecutor(db=self._db, node=plan)
2✔
101
        elif plan_opr_type == PlanOprType.CREATE:
2✔
102
            executor_node = CreateExecutor(db=self._db, node=plan)
2✔
103
        elif plan_opr_type == PlanOprType.RENAME:
2✔
104
            executor_node = RenameExecutor(db=self._db, node=plan)
2✔
105
        elif plan_opr_type == PlanOprType.DROP_OBJECT:
2✔
106
            executor_node = DropObjectExecutor(db=self._db, node=plan)
2✔
107
        elif plan_opr_type == PlanOprType.INSERT:
2✔
108
            executor_node = InsertExecutor(db=self._db, node=plan)
2✔
109
        elif plan_opr_type == PlanOprType.CREATE_UDF:
2✔
110
            executor_node = CreateUDFExecutor(db=self._db, node=plan)
2✔
111
        elif plan_opr_type == PlanOprType.LOAD_DATA:
2✔
112
            executor_node = LoadDataExecutor(db=self._db, node=plan)
2✔
113
        elif plan_opr_type == PlanOprType.GROUP_BY:
2✔
114
            executor_node = GroupByExecutor(db=self._db, node=plan)
2✔
115
        elif plan_opr_type == PlanOprType.ORDER_BY:
2✔
116
            executor_node = OrderByExecutor(db=self._db, node=plan)
2✔
117
        elif plan_opr_type == PlanOprType.LIMIT:
2✔
118
            executor_node = LimitExecutor(db=self._db, node=plan)
2✔
119
        elif plan_opr_type == PlanOprType.SAMPLE:
2✔
120
            executor_node = SampleExecutor(db=self._db, node=plan)
×
121
        elif plan_opr_type == PlanOprType.NESTED_LOOP_JOIN:
2✔
122
            executor_node = NestedLoopJoinExecutor(db=self._db, node=plan)
2✔
123
        elif plan_opr_type == PlanOprType.LATERAL_JOIN:
2✔
124
            logger.warn(
125
                "LateralJoin Executor should not be part of the execution plan."
126
                "Please raise an issue with the current query. Thanks!"
127
            )
128
            executor_node = LateralJoinExecutor(db=self._db, node=plan)
×
129
        elif plan_opr_type == PlanOprType.HASH_JOIN:
2✔
130
            executor_node = HashJoinExecutor(db=self._db, node=plan)
2✔
131
        elif plan_opr_type == PlanOprType.HASH_BUILD:
2✔
132
            executor_node = BuildJoinExecutor(db=self._db, node=plan)
2✔
133
        elif plan_opr_type == PlanOprType.FUNCTION_SCAN:
2✔
134
            executor_node = FunctionScanExecutor(db=self._db, node=plan)
×
135
        elif plan_opr_type == PlanOprType.EXCHANGE:
2✔
136
            executor_node = ExchangeExecutor(db=self._db, node=plan)
×
137
            inner_executor = self._build_execution_tree(plan.inner_plan)
×
138
            executor_node.build_inner_executor(inner_executor)
×
139
        elif plan_opr_type == PlanOprType.PROJECT:
2✔
140
            executor_node = ProjectExecutor(db=self._db, node=plan)
2✔
141
        elif plan_opr_type == PlanOprType.PREDICATE_FILTER:
2✔
142
            executor_node = PredicateExecutor(db=self._db, node=plan)
2✔
143
        elif plan_opr_type == PlanOprType.SHOW_INFO:
2✔
144
            executor_node = ShowInfoExecutor(db=self._db, node=plan)
2✔
145
        elif plan_opr_type == PlanOprType.EXPLAIN:
2✔
146
            executor_node = ExplainExecutor(db=self._db, node=plan)
2✔
147
        elif plan_opr_type == PlanOprType.CREATE_INDEX:
2✔
148
            executor_node = CreateIndexExecutor(db=self._db, node=plan)
2✔
149
        elif plan_opr_type == PlanOprType.APPLY_AND_MERGE:
2✔
150
            executor_node = ApplyAndMergeExecutor(db=self._db, node=plan)
2✔
151
        elif plan_opr_type == PlanOprType.VECTOR_INDEX_SCAN:
2✔
152
            executor_node = VectorIndexScanExecutor(db=self._db, node=plan)
2✔
153
        elif plan_opr_type == PlanOprType.DELETE:
2✔
154
            executor_node = DeleteExecutor(db=self._db, node=plan)
2✔
155

156
        # EXPLAIN does not need to build execution tree for its children
157
        if plan_opr_type != PlanOprType.EXPLAIN:
2✔
158
            # Build Executor Tree for children
159
            for children in plan.children:
2✔
160
                executor_node.append_child(self._build_execution_tree(children))
2✔
161

162
        return executor_node
2✔
163

164
    def execute_plan(
165
        self,
166
        do_not_raise_exceptions: bool = False,
167
        do_not_print_exceptions: bool = False,
168
    ) -> Iterator[Batch]:
169
        """execute the plan tree"""
170
        try:
2✔
171
            execution_tree = self._build_execution_tree(self._plan)
2✔
172
            output = execution_tree.exec()
2✔
173
            if output is not None:
2✔
174
                yield from output
2✔
175
        except Exception as e:
176
            if do_not_raise_exceptions is False:
177
                if do_not_print_exceptions is False:
178
                    logger.exception(str(e))
179
                raise ExecutorError(e)
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc