• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

georgia-tech-db / eva / 0b218f24-fe34-4bbf-bf3d-6268f0b17026

pending completion
0b218f24-fe34-4bbf-bf3d-6268f0b17026

Pull #619

circle-ci

jarulraj
checkpoint
Pull Request #619: chore: reducing coverage loss

208 of 208 new or added lines in 40 files covered. (100.0%)

8800 of 9000 relevant lines covered (97.78%)

0.98 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.44
/eva/executor/load_multimedia_executor.py
1
# coding=utf-8
2
# Copyright 2018-2022 EVA
3
#
4
# Licensed under the Apache License, Version 2.0 (the "License");
5
# you may not use this file except in compliance with the License.
6
# You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
from pathlib import Path
1✔
16

17
import pandas as pd
1✔
18

19
from eva.catalog.catalog_manager import CatalogManager
1✔
20
from eva.catalog.models.table_catalog import TableCatalogEntry
1✔
21
from eva.configuration.configuration_manager import ConfigurationManager
1✔
22
from eva.executor.abstract_executor import AbstractExecutor
1✔
23
from eva.executor.executor_utils import ExecutorError, iter_path_regex, validate_media
1✔
24
from eva.models.storage.batch import Batch
1✔
25
from eva.plan_nodes.load_data_plan import LoadDataPlan
1✔
26
from eva.storage.abstract_storage_engine import AbstractStorageEngine
1✔
27
from eva.storage.storage_engine import StorageEngine
1✔
28
from eva.utils.errors import DatasetFileNotFoundError
1✔
29
from eva.utils.logging_manager import logger
1✔
30
from eva.utils.s3_utils import download_from_s3
1✔
31

32

33
class LoadMultimediaExecutor(AbstractExecutor):
1✔
34
    def __init__(self, node: LoadDataPlan):
1✔
35
        super().__init__(node)
1✔
36
        self.catalog = CatalogManager()
1✔
37
        self.media_type = self.node.file_options["file_format"]
1✔
38

39
    def exec(self):
1✔
40
        storage_engine = None
1✔
41
        table_obj = None
1✔
42
        try:
1✔
43
            video_files = []
1✔
44
            valid_files = []
1✔
45

46
            # If it is a s3 path, download the file to local
47
            if self.node.file_path.as_posix().startswith("s3:/"):
1✔
48
                s3_dir = Path(
1✔
49
                    ConfigurationManager().get_value("storage", "s3_download_dir")
50
                )
51
                dst_path = s3_dir / self.node.table_info.table_name
1✔
52
                dst_path.mkdir(parents=True, exist_ok=True)
1✔
53
                video_files = download_from_s3(self.node.file_path, dst_path)
1✔
54
            else:
55
                # Local Storage
56
                video_files = iter_path_regex(self.node.file_path)
1✔
57

58
            for file_path in video_files:
1✔
59
                file_path = Path(file_path)
1✔
60
                if validate_media(file_path, self.media_type):
1✔
61
                    valid_files.append(str(file_path))
1✔
62
                else:
63
                    err_msg = f"Load {self.media_type.name} failed due to invalid file {str(file_path)}"
1✔
64
                    logger.error(err_msg)
1✔
65
                    raise ValueError(file_path)
1✔
66

67
            if not valid_files:
1✔
68
                raise DatasetFileNotFoundError(
1✔
69
                    f"Load {self.media_type.name} failed due to no valid files found on path {str(self.node.file_path)}"
70
                )
71

72
            # Create catalog entry
73
            table_info = self.node.table_info
1✔
74
            database_name = table_info.database_name
1✔
75
            table_name = table_info.table_name
1✔
76
            # Sanity check to make sure there is no existing table with same name
77
            do_create = False
1✔
78
            table_obj = self.catalog.get_table_catalog_entry(table_name, database_name)
1✔
79
            if table_obj:
1✔
80
                msg = f"Adding to an existing table {table_name}."
1✔
81
                logger.info(msg)
1✔
82
            # Create the catalog entry
83
            else:
84
                table_obj = (
1✔
85
                    self.catalog.create_and_insert_multimedia_table_catalog_entry(
86
                        table_name, self.media_type
87
                    )
88
                )
89
                do_create = True
1✔
90

91
            storage_engine = StorageEngine.factory(table_obj)
1✔
92
            if do_create:
1✔
93
                storage_engine.create(table_obj)
1✔
94

95
            storage_engine.write(
1✔
96
                table_obj,
97
                Batch(pd.DataFrame({"file_path": valid_files})),
98
            )
99

100
        except Exception as e:
1✔
101
            # If we fail to obtain the storage engine or table object,
102
            # there is no further action to take.
103
            if storage_engine and table_obj:
1✔
104
                self._rollback_load(storage_engine, table_obj, do_create)
1✔
105
            err_msg = f"Load {self.media_type.name} failed: encountered unexpected error {str(e)}"
1✔
106
            logger.error(err_msg)
1✔
107
            raise ExecutorError(err_msg)
1✔
108
        else:
109
            yield Batch(
1✔
110
                pd.DataFrame(
111
                    [
112
                        f"Number of loaded {self.media_type.name}: {str(len(valid_files))}"
113
                    ]
114
                )
115
            )
116

117
    def _rollback_load(
1✔
118
        self,
119
        storage_engine: AbstractStorageEngine,
120
        table_obj: TableCatalogEntry,
121
        do_create: bool,
122
    ):
123
        if do_create:
1✔
124
            storage_engine.drop(table_obj)
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc