• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

georgia-tech-db / eva / 91cae4db-ee39-47cb-81ee-210e6aabfa97

12 Oct 2023 08:31AM UTC coverage: 68.499% (+68.5%) from 0.0%
91cae4db-ee39-47cb-81ee-210e6aabfa97

Pull #1280

circle-ci

xzdandy
Minor
Pull Request #1280: Rebatch Optimization

50 of 50 new or added lines in 2 files covered. (100.0%)

8637 of 12609 relevant lines covered (68.5%)

0.68 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/evadb/executor/rebatch_executor.py
1
# coding=utf-8
2
# Copyright 2018-2023 EvaDB
3
#
4
# Licensed under the Apache License, Version 2.0 (the "License");
5
# you may not use this file except in compliance with the License.
6
# You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
import itertools
×
16
from typing import Iterator
×
17

18
from evadb.database import EvaDBDatabase
×
19
from evadb.executor.abstract_executor import AbstractExecutor
×
20
from evadb.models.storage.batch import Batch
×
21
from evadb.plan_nodes.rebatch_plan import RebatchPlan
×
22
from evadb.utils.generic_utils import get_size
×
23

24

25
class RebatchExecutor(AbstractExecutor):
×
26
    """
27
    Rebatch the data pipeline
28

29
    Arguments:
30
        node (AbstractPlan): The Rebatch Plan
31

32
    """
33

34
    def __init__(self, db: EvaDBDatabase, node: RebatchPlan):
×
35
        super().__init__(db, node)
×
36
        self.batch_mem_size = node.batch_mem_size
×
37
        self.batch_size = node.batch_size
×
38

39
    def exec(self, *args, **kwargs) -> Iterator[Batch]:
×
40
        child_iter = self.children[0].exec(**kwargs)
×
41

42
        # Calculate the proper batch size, use the larger one when both batch_mem_size and batch_size are set.
43
        first_batch = next(child_iter)
×
44
        sample_size = get_size(first_batch[:1])
×
45
        batch_size = int(max(1, self.batch_size, self.batch_mem_size / sample_size))
×
46

47
        current_size = 0
×
48
        current_batches = []
×
49
        for batch in itertools.chain((first_batch for _ in (0,)), child_iter):
×
50
            current_batches.append(batch)
×
51
            current_size += len(batch)
×
52
            if batch_size <= current_size:
×
53
                one_batch = Batch.concat(current_batches, copy=False)
×
54
                pos = 0
×
55
                while pos + batch_size <= current_size:
×
56
                    new_batch = one_batch[pos : pos + batch_size]
×
57
                    new_batch.reset_index()
×
58
                    yield new_batch
×
59
                    pos += batch_size
×
60
                if pos < current_size:
×
61
                    current_batches = [one_batch[pos:]]
×
62
                    current_size = len(current_batches[0])
×
63
                else:
64
                    current_batches = []
×
65
                    current_size = 0
×
66
        last_batch = Batch.concat(current_batches, copy=False)
×
67
        yield last_batch
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc