• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

georgia-tech-db / eva / c1f7f0cd-0141-414a-863c-80910314f4e2

pending completion
c1f7f0cd-0141-414a-863c-80910314f4e2

Pull #928

circle-ci

jiashenC
address comments
Pull Request #928: feat: improve the use case doc (v1)

10148 of 10654 relevant lines covered (95.25%)

2.84 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.43
/evadb/executor/plan_executor.py
1
# coding=utf-8
2
# Copyright 2018-2023 EvaDB
3
#
4
# Licensed under the Apache License, Version 2.0 (the "License");
5
# you may not use this file except in compliance with the License.
6
# You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
from typing import Iterator
3✔
16

17
from evadb.database import EvaDBDatabase
3✔
18
from evadb.executor.abstract_executor import AbstractExecutor
3✔
19
from evadb.executor.apply_and_merge_executor import ApplyAndMergeExecutor
3✔
20
from evadb.executor.create_executor import CreateExecutor
3✔
21
from evadb.executor.create_index_executor import CreateIndexExecutor
3✔
22
from evadb.executor.create_udf_executor import CreateUDFExecutor
3✔
23
from evadb.executor.delete_executor import DeleteExecutor
3✔
24
from evadb.executor.drop_object_executor import DropObjectExecutor
3✔
25
from evadb.executor.exchange_executor import ExchangeExecutor
3✔
26
from evadb.executor.executor_utils import ExecutorError
3✔
27
from evadb.executor.explain_executor import ExplainExecutor
3✔
28
from evadb.executor.function_scan_executor import FunctionScanExecutor
3✔
29
from evadb.executor.groupby_executor import GroupByExecutor
3✔
30
from evadb.executor.hash_join_executor import HashJoinExecutor
3✔
31
from evadb.executor.insert_executor import InsertExecutor
3✔
32
from evadb.executor.join_build_executor import BuildJoinExecutor
3✔
33
from evadb.executor.lateral_join_executor import LateralJoinExecutor
3✔
34
from evadb.executor.limit_executor import LimitExecutor
3✔
35
from evadb.executor.load_executor import LoadDataExecutor
3✔
36
from evadb.executor.nested_loop_join_executor import NestedLoopJoinExecutor
3✔
37
from evadb.executor.orderby_executor import OrderByExecutor
3✔
38
from evadb.executor.pp_executor import PPExecutor
3✔
39
from evadb.executor.predicate_executor import PredicateExecutor
3✔
40
from evadb.executor.project_executor import ProjectExecutor
3✔
41
from evadb.executor.rename_executor import RenameExecutor
3✔
42
from evadb.executor.sample_executor import SampleExecutor
3✔
43
from evadb.executor.seq_scan_executor import SequentialScanExecutor
3✔
44
from evadb.executor.show_info_executor import ShowInfoExecutor
3✔
45
from evadb.executor.storage_executor import StorageExecutor
3✔
46
from evadb.executor.union_executor import UnionExecutor
3✔
47
from evadb.executor.vector_index_scan_executor import VectorIndexScanExecutor
3✔
48
from evadb.models.storage.batch import Batch
3✔
49
from evadb.plan_nodes.abstract_plan import AbstractPlan
3✔
50
from evadb.plan_nodes.types import PlanOprType
3✔
51
from evadb.utils.logging_manager import logger
3✔
52

53

54
class PlanExecutor:
3✔
55
    """
56
    This is an interface between plan tree and execution tree.
57
    We traverse the plan tree and build execution tree from it
58

59
    Arguments:
60
        plan (AbstractPlan): Physical plan tree which needs to be executed
61
        evadb (EvaDBDatabase): database to execute the query on
62
    """
63

64
    def __init__(self, evadb: EvaDBDatabase, plan: AbstractPlan):
3✔
65
        self._db = evadb
3✔
66
        self._plan = plan
3✔
67

68
    def _build_execution_tree(self, plan: AbstractPlan) -> AbstractExecutor:
3✔
69
        """build the execution tree from plan tree
70

71
        Arguments:
72
            plan {AbstractPlan} -- Input Plan tree
73

74
        Returns:
75
            AbstractExecutor -- Compiled Execution tree
76
        """
77
        root = None
3✔
78
        if plan is None:
3✔
79
            return root
×
80

81
        # Get plan node type
82
        plan_opr_type = plan.opr_type
3✔
83

84
        if plan_opr_type == PlanOprType.SEQUENTIAL_SCAN:
3✔
85
            executor_node = SequentialScanExecutor(db=self._db, node=plan)
3✔
86
        elif plan_opr_type == PlanOprType.UNION:
3✔
87
            executor_node = UnionExecutor(db=self._db, node=plan)
3✔
88
        elif plan_opr_type == PlanOprType.STORAGE_PLAN:
3✔
89
            executor_node = StorageExecutor(db=self._db, node=plan)
3✔
90
        elif plan_opr_type == PlanOprType.PP_FILTER:
3✔
91
            executor_node = PPExecutor(db=self._db, node=plan)
3✔
92
        elif plan_opr_type == PlanOprType.CREATE:
3✔
93
            executor_node = CreateExecutor(db=self._db, node=plan)
3✔
94
        elif plan_opr_type == PlanOprType.RENAME:
3✔
95
            executor_node = RenameExecutor(db=self._db, node=plan)
3✔
96
        elif plan_opr_type == PlanOprType.DROP_OBJECT:
3✔
97
            executor_node = DropObjectExecutor(db=self._db, node=plan)
3✔
98
        elif plan_opr_type == PlanOprType.INSERT:
3✔
99
            executor_node = InsertExecutor(db=self._db, node=plan)
3✔
100
        elif plan_opr_type == PlanOprType.CREATE_UDF:
3✔
101
            executor_node = CreateUDFExecutor(db=self._db, node=plan)
3✔
102
        elif plan_opr_type == PlanOprType.LOAD_DATA:
3✔
103
            executor_node = LoadDataExecutor(db=self._db, node=plan)
3✔
104
        elif plan_opr_type == PlanOprType.GROUP_BY:
3✔
105
            executor_node = GroupByExecutor(db=self._db, node=plan)
3✔
106
        elif plan_opr_type == PlanOprType.ORDER_BY:
3✔
107
            executor_node = OrderByExecutor(db=self._db, node=plan)
3✔
108
        elif plan_opr_type == PlanOprType.LIMIT:
3✔
109
            executor_node = LimitExecutor(db=self._db, node=plan)
3✔
110
        elif plan_opr_type == PlanOprType.SAMPLE:
3✔
111
            executor_node = SampleExecutor(db=self._db, node=plan)
×
112
        elif plan_opr_type == PlanOprType.NESTED_LOOP_JOIN:
3✔
113
            executor_node = NestedLoopJoinExecutor(db=self._db, node=plan)
3✔
114
        elif plan_opr_type == PlanOprType.LATERAL_JOIN:
3✔
115
            logger.warn(
116
                "LateralJoin Executor should not be part of the execution plan."
117
                "Please raise an issue with the current query. Thanks!"
118
            )
119
            executor_node = LateralJoinExecutor(db=self._db, node=plan)
×
120
        elif plan_opr_type == PlanOprType.HASH_JOIN:
3✔
121
            executor_node = HashJoinExecutor(db=self._db, node=plan)
3✔
122
        elif plan_opr_type == PlanOprType.HASH_BUILD:
3✔
123
            executor_node = BuildJoinExecutor(db=self._db, node=plan)
3✔
124
        elif plan_opr_type == PlanOprType.FUNCTION_SCAN:
3✔
125
            executor_node = FunctionScanExecutor(db=self._db, node=plan)
×
126
        elif plan_opr_type == PlanOprType.EXCHANGE:
3✔
127
            executor_node = ExchangeExecutor(db=self._db, node=plan)
1✔
128
            inner_executor = self._build_execution_tree(plan.inner_plan)
1✔
129
            executor_node.build_inner_executor(inner_executor)
1✔
130
        elif plan_opr_type == PlanOprType.PROJECT:
3✔
131
            executor_node = ProjectExecutor(db=self._db, node=plan)
3✔
132
        elif plan_opr_type == PlanOprType.PREDICATE_FILTER:
3✔
133
            executor_node = PredicateExecutor(db=self._db, node=plan)
3✔
134
        elif plan_opr_type == PlanOprType.SHOW_INFO:
3✔
135
            executor_node = ShowInfoExecutor(db=self._db, node=plan)
3✔
136
        elif plan_opr_type == PlanOprType.EXPLAIN:
3✔
137
            executor_node = ExplainExecutor(db=self._db, node=plan)
3✔
138
        elif plan_opr_type == PlanOprType.CREATE_INDEX:
3✔
139
            executor_node = CreateIndexExecutor(db=self._db, node=plan)
3✔
140
        elif plan_opr_type == PlanOprType.APPLY_AND_MERGE:
3✔
141
            executor_node = ApplyAndMergeExecutor(db=self._db, node=plan)
3✔
142
        elif plan_opr_type == PlanOprType.VECTOR_INDEX_SCAN:
3✔
143
            executor_node = VectorIndexScanExecutor(db=self._db, node=plan)
3✔
144
        elif plan_opr_type == PlanOprType.DELETE:
3✔
145
            executor_node = DeleteExecutor(db=self._db, node=plan)
3✔
146

147
        # EXPLAIN does not need to build execution tree for its children
148
        if plan_opr_type != PlanOprType.EXPLAIN:
3✔
149
            # Build Executor Tree for children
150
            for children in plan.children:
3✔
151
                executor_node.append_child(self._build_execution_tree(children))
3✔
152

153
        return executor_node
3✔
154

155
    def execute_plan(
156
        self,
157
        do_not_raise_exceptions: bool = False,
158
        do_not_print_exceptions: bool = False,
159
    ) -> Iterator[Batch]:
160
        """execute the plan tree"""
161
        try:
3✔
162
            execution_tree = self._build_execution_tree(self._plan)
3✔
163
            output = execution_tree.exec()
3✔
164
            if output is not None:
3✔
165
                yield from output
3✔
166
        except Exception as e:
167
            if do_not_raise_exceptions is False:
168
                if do_not_print_exceptions is False:
169
                    logger.exception(str(e))
170
                raise ExecutorError(e)
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc