• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

georgia-tech-db / eva / #754

04 Sep 2023 09:54PM UTC coverage: 74.807% (-5.5%) from 80.336%
#754

push

circle-ci

jiashenC
update case

8727 of 11666 relevant lines covered (74.81%)

0.75 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

79.17
/evadb/executor/load_multimedia_executor.py
1
# coding=utf-8
2
# Copyright 2018-2023 EvaDB
3
#
4
# Licensed under the Apache License, Version 2.0 (the "License");
5
# you may not use this file except in compliance with the License.
6
# You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
import multiprocessing as mp
1✔
16
from multiprocessing import Pool
1✔
17
from pathlib import Path
1✔
18

19
import pandas as pd
1✔
20

21
from evadb.catalog.models.table_catalog import TableCatalogEntry
1✔
22
from evadb.database import EvaDBDatabase
1✔
23
from evadb.executor.abstract_executor import AbstractExecutor
1✔
24
from evadb.executor.executor_utils import ExecutorError, iter_path_regex, validate_media
1✔
25
from evadb.models.storage.batch import Batch
1✔
26
from evadb.parser.types import FileFormatType
1✔
27
from evadb.plan_nodes.load_data_plan import LoadDataPlan
1✔
28
from evadb.storage.abstract_storage_engine import AbstractStorageEngine
1✔
29
from evadb.storage.storage_engine import StorageEngine
1✔
30
from evadb.utils.errors import DatasetFileNotFoundError
1✔
31
from evadb.utils.generic_utils import try_to_import_cv2, try_to_import_decord
1✔
32
from evadb.utils.logging_manager import logger
1✔
33
from evadb.utils.s3_utils import download_from_s3
1✔
34

35

36
class LoadMultimediaExecutor(AbstractExecutor):
1✔
37
    def __init__(self, db: EvaDBDatabase, node: LoadDataPlan):
1✔
38
        super().__init__(db, node)
1✔
39
        self.media_type = self.node.file_options["file_format"]
1✔
40
        # check for appropriate packages
41
        if self.media_type == FileFormatType.IMAGE:
1✔
42
            try_to_import_cv2()
×
43
        elif self.media_type == FileFormatType.VIDEO:
1✔
44
            try_to_import_decord()
1✔
45
            try_to_import_cv2()
1✔
46

47
    def exec(self, *args, **kwargs):
1✔
48
        storage_engine = None
1✔
49
        table_obj = None
1✔
50
        try:
1✔
51
            video_files = []
1✔
52
            valid_files = []
1✔
53

54
            # If it is a s3 path, download the file to local
55
            if self.node.file_path.as_posix().startswith("s3:/"):
1✔
56
                s3_dir = Path(self.config.get_value("storage", "s3_download_dir"))
×
57
                dst_path = s3_dir / self.node.table_info.table_name
×
58
                dst_path.mkdir(parents=True, exist_ok=True)
×
59
                video_files = download_from_s3(self.node.file_path, dst_path)
×
60
            else:
61
                # Local Storage
62
                video_files = list(iter_path_regex(self.node.file_path))
1✔
63

64
            # Use parallel validation if there are many files. Otherwise, use single-thread
65
            # validation version.
66
            valid_files, invalid_files = [], []
1✔
67
            if len(video_files) < mp.cpu_count() * 2:
1✔
68
                valid_bitmap = [self._is_media_valid(path) for path in video_files]
1✔
69
            else:
70
                # TODO: move this to configuration file later.
71
                pool = Pool(mp.cpu_count())
×
72
                valid_bitmap = pool.map(self._is_media_valid, video_files)
×
73

74
            # Raise error if any file is invalid.
75
            if False in valid_bitmap:
1✔
76
                invalid_files = [
×
77
                    str(path)
78
                    for path, is_valid in zip(video_files, valid_bitmap)
79
                    if not is_valid
80
                ]
81

82
                invalid_files_str = "\n".join(invalid_files)
×
83
                err_msg = f"no valid file found at -- '{invalid_files_str}'."
×
84
                raise ValueError(err_msg)
85

86
            # Get valid files.
87
            valid_files = [
1✔
88
                str(path)
89
                for path, is_valid in zip(video_files, valid_bitmap)
90
                if is_valid
91
            ]
92

93
            if not valid_files:
1✔
94
                raise DatasetFileNotFoundError(
95
                    f"no file found at -- '{str(self.node.file_path)}'."
96
                )
97

98
            # Create catalog entry
99
            table_info = self.node.table_info
1✔
100
            database_name = table_info.database_name
1✔
101
            table_name = table_info.table_name
1✔
102
            # Sanity check to make sure there is no existing table with same name
103
            do_create = False
1✔
104
            table_obj = self.catalog().get_table_catalog_entry(
1✔
105
                table_name, database_name
106
            )
107
            if table_obj:
1✔
108
                msg = f"Adding to an existing table {table_name}."
×
109
                logger.info(msg)
×
110
            # Create the catalog entry
111
            else:
112
                table_obj = (
1✔
113
                    self.catalog().create_and_insert_multimedia_table_catalog_entry(
114
                        table_name, self.media_type
115
                    )
116
                )
117
                do_create = True
1✔
118

119
            storage_engine = StorageEngine.factory(self.db, table_obj)
1✔
120
            if do_create:
1✔
121
                storage_engine.create(table_obj)
1✔
122

123
            storage_engine.write(
1✔
124
                table_obj,
125
                Batch(pd.DataFrame({"file_path": valid_files})),
126
            )
127

128
        except Exception as e:
129
            # If we fail to obtain the storage engine or table object,
130
            # there is no further action to take.
131
            if storage_engine and table_obj:
132
                self._rollback_load(storage_engine, table_obj, do_create)
133
            err_msg = f"Load {self.media_type.name} failed: {str(e)}"
134
            raise ExecutorError(err_msg)
135
        else:
136
            yield Batch(
1✔
137
                pd.DataFrame(
138
                    [
139
                        f"Number of loaded {self.media_type.name}: {str(len(valid_files))}"
140
                    ]
141
                )
142
            )
143

144
    def _rollback_load(
1✔
145
        self,
146
        storage_engine: AbstractStorageEngine,
147
        table_obj: TableCatalogEntry,
148
        do_create: bool,
149
    ):
150
        if do_create:
×
151
            storage_engine.drop(table_obj)
×
152

153
    def _is_media_valid(
1✔
154
        self,
155
        file_path: Path,
156
    ):
157
        file_path = Path(file_path)
1✔
158
        if validate_media(file_path, self.media_type):
1✔
159
            return True
1✔
160
        return False
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc