• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

georgia-tech-db / eva / #844

31 Oct 2023 05:42AM UTC coverage: 0.0%. Remained the same
#844

push

circle-ci

Andy Xu
Fix typo

0 of 12389 relevant lines covered (0.0%)

0.0 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/evadb/executor/load_multimedia_executor.py
1
# coding=utf-8
2
# Copyright 2018-2023 EvaDB
3
#
4
# Licensed under the Apache License, Version 2.0 (the "License");
5
# you may not use this file except in compliance with the License.
6
# You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
import multiprocessing as mp
×
16
from multiprocessing import Pool
×
17
from pathlib import Path
×
18

19
import pandas as pd
×
20

21
from evadb.catalog.models.table_catalog import TableCatalogEntry
×
22
from evadb.database import EvaDBDatabase
×
23
from evadb.executor.abstract_executor import AbstractExecutor
×
24
from evadb.executor.executor_utils import ExecutorError, iter_path_regex, validate_media
×
25
from evadb.models.storage.batch import Batch
×
26
from evadb.parser.types import FileFormatType
×
27
from evadb.plan_nodes.load_data_plan import LoadDataPlan
×
28
from evadb.storage.abstract_storage_engine import AbstractStorageEngine
×
29
from evadb.storage.storage_engine import StorageEngine
×
30
from evadb.utils.errors import DatasetFileNotFoundError
×
31
from evadb.utils.generic_utils import try_to_import_cv2, try_to_import_decord
×
32
from evadb.utils.logging_manager import logger
×
33
from evadb.utils.s3_utils import download_from_s3
×
34

35

36
class LoadMultimediaExecutor(AbstractExecutor):
×
37
    def __init__(self, db: EvaDBDatabase, node: LoadDataPlan):
×
38
        super().__init__(db, node)
×
39
        self.media_type = self.node.file_options["file_format"]
×
40
        # check for appropriate packages
41
        if self.media_type == FileFormatType.IMAGE:
×
42
            try_to_import_cv2()
×
43
        elif self.media_type == FileFormatType.VIDEO:
×
44
            try_to_import_decord()
×
45
            try_to_import_cv2()
×
46

47
    def exec(self, *args, **kwargs):
×
48
        storage_engine = None
×
49
        table_obj = None
×
50
        try:
×
51
            video_files = []
×
52
            valid_files = []
×
53

54
            # If it is a s3 path, download the file to local
55
            if self.node.file_path.as_posix().startswith("s3:/"):
×
56
                s3_dir = Path(
×
57
                    self.catalog().get_configuration_catalog_value("s3_download_dir")
58
                )
59
                dst_path = s3_dir / self.node.table_info.table_name
×
60
                dst_path.mkdir(parents=True, exist_ok=True)
×
61
                video_files = download_from_s3(self.node.file_path, dst_path)
×
62
            else:
63
                # Local Storage
64
                video_files = list(iter_path_regex(self.node.file_path))
×
65

66
            # Use parallel validation if there are many files. Otherwise, use single-thread
67
            # validation version.
68
            valid_files, invalid_files = [], []
×
69
            if len(video_files) < mp.cpu_count() * 2:
×
70
                valid_bitmap = [self._is_media_valid(path) for path in video_files]
×
71
            else:
72
                # TODO: move this to configuration file later.
73
                pool = Pool(mp.cpu_count())
×
74
                valid_bitmap = pool.map(self._is_media_valid, video_files)
×
75

76
            # Raise error if any file is invalid.
77
            if False in valid_bitmap:
×
78
                invalid_files = [
×
79
                    str(path)
80
                    for path, is_valid in zip(video_files, valid_bitmap)
81
                    if not is_valid
82
                ]
83

84
                invalid_files_str = "\n".join(invalid_files)
×
85
                err_msg = f"no valid file found at -- '{invalid_files_str}'."
×
86
                raise ValueError(err_msg)
87

88
            # Get valid files.
89
            valid_files = [
×
90
                str(path)
91
                for path, is_valid in zip(video_files, valid_bitmap)
92
                if is_valid
93
            ]
94

95
            if not valid_files:
×
96
                raise DatasetFileNotFoundError(
97
                    f"no file found at -- '{str(self.node.file_path)}'."
98
                )
99

100
            # Create catalog entry
101
            table_info = self.node.table_info
×
102
            database_name = table_info.database_name
×
103
            table_name = table_info.table_name
×
104
            # Sanity check to make sure there is no existing table with same name
105
            do_create = False
×
106
            table_obj = self.catalog().get_table_catalog_entry(
×
107
                table_name, database_name
108
            )
109
            if table_obj:
×
110
                msg = f"Adding to an existing table {table_name}."
×
111
                logger.info(msg)
×
112
            # Create the catalog entry
113
            else:
114
                table_obj = (
×
115
                    self.catalog().create_and_insert_multimedia_table_catalog_entry(
116
                        table_name, self.media_type
117
                    )
118
                )
119
                do_create = True
×
120

121
            storage_engine = StorageEngine.factory(self.db, table_obj)
×
122
            if do_create:
×
123
                storage_engine.create(table_obj)
×
124

125
            storage_engine.write(
×
126
                table_obj,
127
                Batch(pd.DataFrame({"file_path": valid_files})),
128
            )
129

130
        except Exception as e:
131
            # If we fail to obtain the storage engine or table object,
132
            # there is no further action to take.
133
            if storage_engine and table_obj:
134
                self._rollback_load(storage_engine, table_obj, do_create)
135
            err_msg = f"Load {self.media_type.name} failed: {str(e)}"
136
            raise ExecutorError(err_msg)
137
        else:
138
            yield Batch(
×
139
                pd.DataFrame(
140
                    [
141
                        f"Number of loaded {self.media_type.name}: {str(len(valid_files))}"
142
                    ]
143
                )
144
            )
145

146
    def _rollback_load(
×
147
        self,
148
        storage_engine: AbstractStorageEngine,
149
        table_obj: TableCatalogEntry,
150
        do_create: bool,
151
    ):
152
        if do_create:
×
153
            storage_engine.drop(table_obj)
×
154

155
    def _is_media_valid(
×
156
        self,
157
        file_path: Path,
158
    ):
159
        file_path = Path(file_path)
×
160
        if validate_media(file_path, self.media_type):
×
161
            return True
×
162
        return False
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc