• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

georgia-tech-db / eva / #754

04 Sep 2023 09:54PM UTC coverage: 74.807% (-5.5%) from 80.336%
#754

push

circle-ci

jiashenC
update case

8727 of 11666 relevant lines covered (74.81%)

0.75 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

52.17
/evadb/executor/apply_and_merge_executor.py
1
# coding=utf-8
2
# Copyright 2018-2023 EvaDB
3
#
4
# Licensed under the Apache License, Version 2.0 (the "License");
5
# you may not use this file except in compliance with the License.
6
# You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
from typing import Iterator
1✔
16

17
from evadb.database import EvaDBDatabase
1✔
18
from evadb.executor.abstract_executor import AbstractExecutor
1✔
19
from evadb.models.storage.batch import Batch
1✔
20
from evadb.plan_nodes.apply_and_merge_plan import ApplyAndMergePlan
1✔
21

22

23
class ApplyAndMergeExecutor(AbstractExecutor):
1✔
24
    """
25
    Apply the function expression to the input data, merge the output of the function
26
    with the input data, and yield the result to the parent. The current implementation
27
    assumes an inner join while merging. Therefore, if the function does not return any
28
    output, the input rows are dropped.
29
    Arguments:
30
        node (AbstractPlan): ApplyAndMergePlan
31

32
    """
33

34
    def __init__(self, db: EvaDBDatabase, node: ApplyAndMergePlan):
1✔
35
        super().__init__(db, node)
1✔
36
        self.func_expr = node.func_expr
1✔
37
        self.do_unnest = node.do_unnest
1✔
38
        self.alias = node.alias
1✔
39

40
    def exec(self, *args, **kwargs) -> Iterator[Batch]:
1✔
41
        child_executor = self.children[0]
×
42
        for batch in child_executor.exec(**kwargs):
×
43
            func_result = self.func_expr.evaluate(batch)
×
44

45
            # persist stats of function expression
46
            if self.func_expr.function_obj and self.func_expr._stats:
×
47
                function_id = self.func_expr.function_obj.row_id
×
48
                self.catalog().upsert_function_cost_catalog_entry(
×
49
                    function_id,
50
                    self.func_expr.function_obj.name,
51
                    self.func_expr._stats.prev_cost,
52
                )
53

54
            output = Batch.merge_column_wise([batch, func_result])
×
55
            if self.do_unnest:
×
56
                output.unnest(func_result.columns)
×
57
                # we reset the index as after unnest there can be duplicate index
58
                output.reset_index()
×
59

60
            yield output
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc