• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

georgia-tech-db / eva / d32fa926-274f-4de7-bc28-556f7bc3ee90

pending completion
d32fa926-274f-4de7-bc28-556f7bc3ee90

Pull #814

circle-ci

jiashenC
wip: time info
Pull Request #814: feat: benchmark question answering v1

42 of 42 new or added lines in 2 files covered. (100.0%)

10095 of 10421 relevant lines covered (96.87%)

0.97 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.53
/eva/executor/load_multimedia_executor.py
1
# coding=utf-8
2
# Copyright 2018-2023 EVA
3
#
4
# Licensed under the Apache License, Version 2.0 (the "License");
5
# you may not use this file except in compliance with the License.
6
# You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
import multiprocessing as mp
1✔
16
from multiprocessing import Pool
1✔
17
from pathlib import Path
1✔
18

19
import pandas as pd
1✔
20

21
from eva.catalog.catalog_manager import CatalogManager
1✔
22
from eva.catalog.models.table_catalog import TableCatalogEntry
1✔
23
from eva.configuration.configuration_manager import ConfigurationManager
1✔
24
from eva.executor.abstract_executor import AbstractExecutor
1✔
25
from eva.executor.executor_utils import ExecutorError, iter_path_regex, validate_media
1✔
26
from eva.models.storage.batch import Batch
1✔
27
from eva.plan_nodes.load_data_plan import LoadDataPlan
1✔
28
from eva.storage.abstract_storage_engine import AbstractStorageEngine
1✔
29
from eva.storage.storage_engine import StorageEngine
1✔
30
from eva.utils.errors import DatasetFileNotFoundError
1✔
31
from eva.utils.logging_manager import logger
1✔
32
from eva.utils.s3_utils import download_from_s3
1✔
33

34

35
class LoadMultimediaExecutor(AbstractExecutor):
1✔
36
    def __init__(self, node: LoadDataPlan):
1✔
37
        super().__init__(node)
1✔
38
        self.catalog = CatalogManager()
1✔
39
        self.media_type = self.node.file_options["file_format"]
1✔
40

41
    def exec(self, *args, **kwargs):
1✔
42
        storage_engine = None
1✔
43
        table_obj = None
1✔
44
        try:
1✔
45
            video_files = []
1✔
46
            valid_files = []
1✔
47

48
            # If it is a s3 path, download the file to local
49
            if self.node.file_path.as_posix().startswith("s3:/"):
1✔
50
                s3_dir = Path(
1✔
51
                    ConfigurationManager().get_value("storage", "s3_download_dir")
52
                )
53
                dst_path = s3_dir / self.node.table_info.table_name
1✔
54
                dst_path.mkdir(parents=True, exist_ok=True)
1✔
55
                video_files = download_from_s3(self.node.file_path, dst_path)
1✔
56
            else:
57
                # Local Storage
58
                video_files = list(iter_path_regex(self.node.file_path))
1✔
59

60
            # Use parallel validation if there are many files. Otherwise, use single-thread
61
            # validation version.
62
            valid_files, invalid_files = [], []
1✔
63
            if len(video_files) < mp.cpu_count() * 2:
1✔
64
                valid_bitmap = [self._is_media_valid(path) for path in video_files]
1✔
65
            else:
66
                # TODO: move this to configuration file later.
67
                pool = Pool(mp.cpu_count())
1✔
68
                valid_bitmap = pool.map(self._is_media_valid, video_files)
1✔
69

70
            # Raise error if any file is invalid.
71
            if False in valid_bitmap:
1✔
72
                invalid_files = [
1✔
73
                    str(path)
74
                    for path, is_valid in zip(video_files, valid_bitmap)
75
                    if not is_valid
76
                ]
77

78
                invalid_files_str = "\n".join(invalid_files)
1✔
79
                err_msg = f"Load {self.media_type.name} failed due to invalid files: \n{invalid_files_str}"
1✔
80
                logger.error(err_msg)
1✔
81
                raise ValueError(err_msg)
82

83
            # Get valid files.
84
            valid_files = [
1✔
85
                str(path)
86
                for path, is_valid in zip(video_files, valid_bitmap)
87
                if is_valid
88
            ]
89

90
            if not valid_files:
1✔
91
                raise DatasetFileNotFoundError(
92
                    f"Load {self.media_type.name} failed due to no valid files found on path {str(self.node.file_path)}"
93
                )
94

95
            # Create catalog entry
96
            table_info = self.node.table_info
1✔
97
            database_name = table_info.database_name
1✔
98
            table_name = table_info.table_name
1✔
99
            # Sanity check to make sure there is no existing table with same name
100
            do_create = False
1✔
101
            table_obj = self.catalog.get_table_catalog_entry(table_name, database_name)
1✔
102
            if table_obj:
1✔
103
                msg = f"Adding to an existing table {table_name}."
1✔
104
                logger.info(msg)
1✔
105
            # Create the catalog entry
106
            else:
107
                table_obj = (
1✔
108
                    self.catalog.create_and_insert_multimedia_table_catalog_entry(
109
                        table_name, self.media_type
110
                    )
111
                )
112
                do_create = True
1✔
113

114
            storage_engine = StorageEngine.factory(table_obj)
1✔
115
            if do_create:
1✔
116
                storage_engine.create(table_obj)
1✔
117

118
            storage_engine.write(
1✔
119
                table_obj,
120
                Batch(pd.DataFrame({"file_path": valid_files})),
121
            )
122

123
        except Exception as e:
124
            # If we fail to obtain the storage engine or table object,
125
            # there is no further action to take.
126
            if storage_engine and table_obj:
127
                self._rollback_load(storage_engine, table_obj, do_create)
128
            err_msg = f"Load {self.media_type.name} failed: encountered unexpected error {str(e)}"
129
            logger.error(err_msg)
130
            raise ExecutorError(err_msg)
131
        else:
132
            yield Batch(
1✔
133
                pd.DataFrame(
134
                    [
135
                        f"Number of loaded {self.media_type.name}: {str(len(valid_files))}"
136
                    ]
137
                )
138
            )
139

140
    def _rollback_load(
1✔
141
        self,
142
        storage_engine: AbstractStorageEngine,
143
        table_obj: TableCatalogEntry,
144
        do_create: bool,
145
    ):
146
        if do_create:
1✔
147
            storage_engine.drop(table_obj)
×
148

149
    def _is_media_valid(
1✔
150
        self,
151
        file_path: Path,
152
    ):
153
        file_path = Path(file_path)
1✔
154
        if validate_media(file_path, self.media_type):
1✔
155
            return True
1✔
156
        return False
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc