• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

georgia-tech-db / eva / 7a35fea5-b214-41c0-a2c4-5ff71bc5992a

pending completion
7a35fea5-b214-41c0-a2c4-5ff71bc5992a

Pull #732

circle-ci

Yulai Cui
fix integration test
Pull Request #732: feat: OVERWRITE statement

345 of 345 new or added lines in 21 files covered. (100.0%)

9560 of 9786 relevant lines covered (97.69%)

0.98 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.06
/eva/executor/overwrite_executor.py
1
# coding=utf-8
2
# Copyright 2018-2022 EVA
3
#
4
# Licensed under the Apache License, Version 2.0 (the "License");
5
# you may not use this file except in compliance with the License.
6
# You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
import os
1✔
16
from collections import defaultdict
1✔
17

18
import cv2
1✔
19
import numpy as np
1✔
20
import pandas as pd
1✔
21
from PIL import Image as im
1✔
22

23
from eva.catalog.catalog_manager import CatalogManager
1✔
24
from eva.catalog.catalog_type import TableType
1✔
25
from eva.executor.abstract_executor import AbstractExecutor
1✔
26
from eva.expression.function_expression import FunctionExpression
1✔
27
from eva.models.storage.batch import Batch
1✔
28
from eva.plan_nodes.overwrite_plan import OverwritePlan
1✔
29
from eva.storage.storage_engine import StorageEngine
1✔
30

31

32
class OverwriteExecutor(AbstractExecutor):
1✔
33
    def __init__(self, node: OverwritePlan):
1✔
34
        super().__init__(node)
1✔
35
        self.catalog = CatalogManager()
1✔
36
        self.table_ref = self.node.table_ref
1✔
37
        self.operation = self.node.operation
1✔
38

39
    def make_new_video_path(self, video_path: str):
1✔
40
        dirs = video_path.split("/")
1✔
41
        dirs.append(dirs[-1])
1✔
42
        dirs[-2] = "modified"
1✔
43
        modified_dir = "/".join(dirs[:-1])
1✔
44
        if not os.path.exists(modified_dir):
1✔
45
            os.mkdir(modified_dir)
1✔
46
        new_video_path = "/".join(dirs)
1✔
47
        return new_video_path
1✔
48

49
    def make_new_image_path(self, image_path: str):
1✔
50
        dirs = image_path.split("/")
1✔
51
        dirs[-1] = "modified_" + dirs[-1]
1✔
52

53
        new_image_path = "/".join(dirs)
1✔
54
        return new_image_path
1✔
55

56
    def exec(self, *args, **kwargs):
1✔
57
        assert type(self.operation) == FunctionExpression
1✔
58

59
        table_obj = self.table_ref.table.table_obj
1✔
60
        storage_engine = StorageEngine.factory(table_obj)
1✔
61
        batch_mem_size = 30000000
1✔
62

63
        if table_obj.table_type == TableType.VIDEO_DATA:
1✔
64
            batches = storage_engine.read(table_obj, batch_mem_size)
1✔
65
            modified_paths = dict()
1✔
66
            all_video_paths = set()
1✔
67
            all_videos = dict()
1✔
68

69
            for batch in batches:
1✔
70
                original_fc = batch.frames.shape[0]
1✔
71
                video_fc = defaultdict(lambda: 0)
1✔
72
                all_video_paths = all_video_paths.union(set(batch.frames["name"]))
1✔
73
                video_paths = list(set(batch.frames["name"]))
1✔
74
                for i in range(original_fc):
1✔
75
                    video_fc[batch.frames["name"][i]] += 1
1✔
76

77
                batch.modify_column_alias(self.table_ref.alias)
1✔
78
                res = self.operation.evaluate(batch)
1✔
79
                modified_frames = res.frames.iloc[:, 0].to_numpy()
1✔
80
                fc = modified_frames.shape[0]
1✔
81
                fh = modified_frames[0].shape[0]
1✔
82
                fw = modified_frames[0].shape[1]
1✔
83
                batch.drop_column_alias()
1✔
84

85
                assert original_fc == fc
1✔
86

87
                videos = dict()
1✔
88
                next_index = defaultdict(lambda: 0)
1✔
89

90
                for video_path in video_paths:
1✔
91
                    videos[video_path] = np.empty(
1✔
92
                        (video_fc[video_path], fh, fw, 3), np.dtype("uint8")
93
                    )
94
                    modified_paths[video_path] = self.make_new_video_path(video_path)
1✔
95

96
                for i in range(fc):
1✔
97
                    video_path = batch.frames["name"][i]
1✔
98
                    index = next_index[video_path]
1✔
99
                    videos[video_path][index] = modified_frames[i]
1✔
100
                    next_index[video_path] += 1
1✔
101

102
                for original_video_path in video_paths:
1✔
103
                    if original_video_path not in all_videos.keys():
1✔
104
                        all_videos[original_video_path] = videos[original_video_path]
1✔
105
                    else:
106
                        all_videos[original_video_path] = np.vstack(
×
107
                            (
108
                                all_videos[original_video_path],
109
                                videos[original_video_path],
110
                            )
111
                        )
112

113
            storage_engine.clear(table_obj, list(all_video_paths))
1✔
114
            all_modified_video_paths = []
1✔
115
            for original_video_path in list(all_video_paths):
1✔
116
                fc, fh, fw, _ = all_videos[original_video_path].shape
1✔
117
                out = cv2.VideoWriter(
1✔
118
                    modified_paths[original_video_path],
119
                    cv2.VideoWriter_fourcc(*"mp4v"),
120
                    fc,
121
                    (fw, fh),
122
                )
123
                all_modified_video_paths.append(modified_paths[original_video_path])
1✔
124
                for fn in range(fc):
1✔
125
                    frame = all_videos[original_video_path][fn]
1✔
126
                    out.write(frame)
1✔
127
                out.release()
1✔
128

129
            storage_engine.write(
1✔
130
                table_obj, Batch(pd.DataFrame({"file_path": all_modified_video_paths}))
131
            )
132

133
            yield Batch(
1✔
134
                pd.DataFrame(
135
                    {
136
                        "Table successfully overwritten by: {}".format(
137
                            self.operation.name
138
                        )
139
                    },
140
                    index=[0],
141
                )
142
            )
143
        elif table_obj.table_type == TableType.IMAGE_DATA:
1✔
144
            batches = storage_engine.read(table_obj)
1✔
145
            modified_image_paths = []
1✔
146
            for batch in batches:
1✔
147
                image_paths = list(batch.frames["name"])
1✔
148

149
                batch.modify_column_alias(self.table_ref.alias)
1✔
150
                res = self.operation.evaluate(batch)
1✔
151
                modified_images = res.frames.iloc[:, 0].to_numpy()
1✔
152
                batch.drop_column_alias()
1✔
153

154
                for i in range(len(modified_images)):
1✔
155
                    image_path = image_paths[i]
1✔
156
                    modified_image_path = self.make_new_image_path(image_path)
1✔
157
                    modified_image_paths.append(modified_image_path)
1✔
158
                    data = im.fromarray(modified_images[i])
1✔
159
                    data.save(modified_image_path)
1✔
160

161
            storage_engine.clear(table_obj, image_paths)
1✔
162
            storage_engine.write(
1✔
163
                table_obj, Batch(pd.DataFrame({"file_path": modified_image_paths}))
164
            )
165

166
            yield Batch(
1✔
167
                pd.DataFrame(
168
                    {
169
                        "Table successfully overwritten by: {}".format(
170
                            self.operation.name
171
                        )
172
                    },
173
                    index=[0],
174
                )
175
            )
176
        else:
177
            yield Batch(
×
178
                pd.DataFrame(
179
                    {
180
                        "Overwrite only supports video data for : {}".format(
181
                            self.operation.name
182
                        )
183
                    },
184
                    index=[0],
185
                )
186
            )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc