• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

georgia-tech-db / eva / #725

pending completion
#725

push

circle-ci

web-flow
chore: reducing coverage loss (#619)

* adding delete operation

* Adding Insert Statement

* checkpoint

* supporting multiple entries

* implemented for structured data error

* adding parser visitor for delete

* delete executor

* delete plan and rules

* adding delete to plan executor

* change position of LogicalDelete

* logical delimeter

* delete test case

* adding test case

* adding test case

* adding delete testcase

* adding predicate to delete executor

* adding delete to Image storage

* bug fix in delete

* fixing testcase

* adding test case for insert statement

* remove order_by from statement_binder.py

* better variable names, using Batch

* error message for insert

* removing order_by and limit from delete

* remove order_by and limit

* use f-string

* adding to changelog

* removing commit messages

* formatting

* fixing comments

* formatting

* eva insert f32 values

* fix: should delete range

* delete multiple rows

* udf bootstrap

* try to run tests in parallel

* minor fix for ray to work

* ray fixes

---------

Co-authored-by: Aryan-Rajoria <aryanrajoria1003@gmail.com>
Co-authored-by: Gaurav <gaurav21776@gmail.com>

236 of 236 new or added lines in 53 files covered. (100.0%)

7003 of 9005 relevant lines covered (77.77%)

0.78 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

32.81
/eva/executor/load_multimedia_executor.py
1
# coding=utf-8
2
# Copyright 2018-2022 EVA
3
#
4
# Licensed under the Apache License, Version 2.0 (the "License");
5
# you may not use this file except in compliance with the License.
6
# You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
from pathlib import Path
1✔
16

17
import pandas as pd
1✔
18

19
from eva.catalog.catalog_manager import CatalogManager
1✔
20
from eva.catalog.models.table_catalog import TableCatalogEntry
1✔
21
from eva.configuration.configuration_manager import ConfigurationManager
1✔
22
from eva.executor.abstract_executor import AbstractExecutor
1✔
23
from eva.executor.executor_utils import ExecutorError, iter_path_regex, validate_media
1✔
24
from eva.models.storage.batch import Batch
1✔
25
from eva.plan_nodes.load_data_plan import LoadDataPlan
1✔
26
from eva.storage.abstract_storage_engine import AbstractStorageEngine
1✔
27
from eva.storage.storage_engine import StorageEngine
1✔
28
from eva.utils.errors import DatasetFileNotFoundError
1✔
29
from eva.utils.logging_manager import logger
1✔
30
from eva.utils.s3_utils import download_from_s3
1✔
31

32

33
class LoadMultimediaExecutor(AbstractExecutor):
1✔
34
    def __init__(self, node: LoadDataPlan):
1✔
35
        super().__init__(node)
1✔
36
        self.catalog = CatalogManager()
1✔
37
        self.media_type = self.node.file_options["file_format"]
1✔
38

39
    def exec(self, *args, **kwargs):
1✔
40
        storage_engine = None
×
41
        table_obj = None
×
42
        try:
×
43
            video_files = []
×
44
            valid_files = []
×
45

46
            # If it is a s3 path, download the file to local
47
            if self.node.file_path.as_posix().startswith("s3:/"):
×
48
                s3_dir = Path(
×
49
                    ConfigurationManager().get_value("storage", "s3_download_dir")
50
                )
51
                dst_path = s3_dir / self.node.table_info.table_name
×
52
                dst_path.mkdir(parents=True, exist_ok=True)
×
53
                video_files = download_from_s3(self.node.file_path, dst_path)
×
54
            else:
55
                # Local Storage
56
                video_files = iter_path_regex(self.node.file_path)
×
57

58
            for file_path in video_files:
×
59
                file_path = Path(file_path)
×
60
                if validate_media(file_path, self.media_type):
×
61
                    valid_files.append(str(file_path))
×
62
                else:
63
                    err_msg = f"Load {self.media_type.name} failed due to invalid file {str(file_path)}"
×
64
                    logger.error(err_msg)
×
65
                    raise ValueError(file_path)
×
66

67
            if not valid_files:
×
68
                raise DatasetFileNotFoundError(
×
69
                    f"Load {self.media_type.name} failed due to no valid files found on path {str(self.node.file_path)}"
70
                )
71

72
            # Create catalog entry
73
            table_info = self.node.table_info
×
74
            database_name = table_info.database_name
×
75
            table_name = table_info.table_name
×
76
            # Sanity check to make sure there is no existing table with same name
77
            do_create = False
×
78
            table_obj = self.catalog.get_table_catalog_entry(table_name, database_name)
×
79
            if table_obj:
×
80
                msg = f"Adding to an existing table {table_name}."
×
81
                logger.info(msg)
×
82
            # Create the catalog entry
83
            else:
84
                table_obj = (
×
85
                    self.catalog.create_and_insert_multimedia_table_catalog_entry(
86
                        table_name, self.media_type
87
                    )
88
                )
89
                do_create = True
×
90

91
            storage_engine = StorageEngine.factory(table_obj)
×
92
            if do_create:
×
93
                storage_engine.create(table_obj)
×
94

95
            storage_engine.write(
×
96
                table_obj,
97
                Batch(pd.DataFrame({"file_path": valid_files})),
98
            )
99

100
        except Exception as e:
×
101
            # If we fail to obtain the storage engine or table object,
102
            # there is no further action to take.
103
            if storage_engine and table_obj:
×
104
                self._rollback_load(storage_engine, table_obj, do_create)
×
105
            err_msg = f"Load {self.media_type.name} failed: encountered unexpected error {str(e)}"
×
106
            logger.error(err_msg)
×
107
            raise ExecutorError(err_msg)
×
108
        else:
109
            yield Batch(
×
110
                pd.DataFrame(
111
                    [
112
                        f"Number of loaded {self.media_type.name}: {str(len(valid_files))}"
113
                    ]
114
                )
115
            )
116

117
    def _rollback_load(
1✔
118
        self,
119
        storage_engine: AbstractStorageEngine,
120
        table_obj: TableCatalogEntry,
121
        do_create: bool,
122
    ):
123
        if do_create:
×
124
            storage_engine.drop(table_obj)
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc