• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

georgia-tech-db / eva / d88c9094-1a92-4944-91ea-5e7e8275cb11

23 Nov 2023 10:19PM UTC coverage: 67.117% (+67.1%) from 0.0%
d88c9094-1a92-4944-91ea-5e7e8275cb11

push

circleci

web-flow
Merge branch 'georgia-tech-db:staging' into cost_batching

342 of 692 new or added lines in 47 files covered. (49.42%)

12 existing lines in 4 files now uncovered.

9189 of 13691 relevant lines covered (67.12%)

0.67 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

79.45
/evadb/executor/load_multimedia_executor.py
1
# coding=utf-8
2
# Copyright 2018-2023 EvaDB
3
#
4
# Licensed under the Apache License, Version 2.0 (the "License");
5
# you may not use this file except in compliance with the License.
6
# You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
import multiprocessing as mp
1✔
16
from multiprocessing import Pool
1✔
17
from pathlib import Path
1✔
18

19
import pandas as pd
1✔
20

21
from evadb.catalog.models.table_catalog import TableCatalogEntry
1✔
22
from evadb.database import EvaDBDatabase
1✔
23
from evadb.executor.abstract_executor import AbstractExecutor
1✔
24
from evadb.executor.executor_utils import ExecutorError, iter_path_regex, validate_media
1✔
25
from evadb.models.storage.batch import Batch
1✔
26
from evadb.parser.types import FileFormatType
1✔
27
from evadb.plan_nodes.load_data_plan import LoadDataPlan
1✔
28
from evadb.storage.abstract_storage_engine import AbstractStorageEngine
1✔
29
from evadb.storage.storage_engine import StorageEngine
1✔
30
from evadb.utils.errors import DatasetFileNotFoundError
1✔
31
from evadb.utils.generic_utils import try_to_import_cv2, try_to_import_decord
1✔
32
from evadb.utils.logging_manager import logger
1✔
33
from evadb.utils.s3_utils import download_from_s3
1✔
34

35

36
class LoadMultimediaExecutor(AbstractExecutor):
1✔
37
    def __init__(self, db: EvaDBDatabase, node: LoadDataPlan):
1✔
38
        super().__init__(db, node)
1✔
39
        self.media_type = self.node.file_options["file_format"]
1✔
40
        # check for appropriate packages
41
        if self.media_type == FileFormatType.IMAGE:
1✔
42
            try_to_import_cv2()
1✔
43
        elif self.media_type == FileFormatType.VIDEO:
1✔
44
            try_to_import_decord()
1✔
45
            try_to_import_cv2()
1✔
46

47
    def exec(self, *args, **kwargs):
1✔
48
        storage_engine = None
1✔
49
        table_obj = None
1✔
50
        try:
1✔
51
            video_files = []
1✔
52
            valid_files = []
1✔
53

54
            # If it is a s3 path, download the file to local
55
            if self.node.file_path.as_posix().startswith("s3:/"):
1✔
56
                s3_dir = Path(
×
57
                    self.catalog().get_configuration_catalog_value("s3_download_dir")
58
                )
59
                dst_path = s3_dir / self.node.table_info.table_name
×
60
                dst_path.mkdir(parents=True, exist_ok=True)
×
61
                video_files = download_from_s3(self.node.file_path, dst_path)
×
62
            else:
63
                # Local Storage
64
                video_files = list(iter_path_regex(self.node.file_path))
1✔
65

66
            # Use parallel validation if there are many files. Otherwise, use single-thread
67
            # validation version.
68
            valid_files, invalid_files = [], []
1✔
69
            if len(video_files) < mp.cpu_count() * 2:
1✔
70
                valid_bitmap = [self._is_media_valid(path) for path in video_files]
1✔
71
            else:
72
                # TODO: move this to configuration file later.
73
                pool = Pool(mp.cpu_count())
×
74
                valid_bitmap = pool.map(self._is_media_valid, video_files)
×
75

76
            # Raise error if any file is invalid.
77
            if False in valid_bitmap:
1✔
78
                invalid_files = [
×
79
                    str(path)
80
                    for path, is_valid in zip(video_files, valid_bitmap)
81
                    if not is_valid
82
                ]
83

84
                invalid_files_str = "\n".join(invalid_files)
×
85
                err_msg = f"no valid file found at -- '{invalid_files_str}'."
×
NEW
86
                logger.error(err_msg)
×
87

88
            # Get valid files.
89
            valid_files = [
1✔
90
                str(path)
91
                for path, is_valid in zip(video_files, valid_bitmap)
92
                if is_valid
93
            ]
94

95
            if not valid_files:
1✔
96
                raise DatasetFileNotFoundError(
97
                    f"no file found at -- '{str(self.node.file_path)}'."
98
                )
99

100
            # Create catalog entry
101
            table_info = self.node.table_info
1✔
102
            database_name = table_info.database_name
1✔
103
            table_name = table_info.table_name
1✔
104
            # Sanity check to make sure there is no existing table with same name
105
            do_create = False
1✔
106
            table_obj = self.catalog().get_table_catalog_entry(
1✔
107
                table_name, database_name
108
            )
109
            if table_obj:
1✔
110
                msg = f"Adding to an existing table {table_name}."
×
111
                logger.info(msg)
×
112
            # Create the catalog entry
113
            else:
114
                table_obj = (
1✔
115
                    self.catalog().create_and_insert_multimedia_table_catalog_entry(
116
                        table_name, self.media_type
117
                    )
118
                )
119
                do_create = True
1✔
120

121
            storage_engine = StorageEngine.factory(self.db, table_obj)
1✔
122
            if do_create:
1✔
123
                storage_engine.create(table_obj)
1✔
124

125
            storage_engine.write(
1✔
126
                table_obj,
127
                Batch(pd.DataFrame({"file_path": valid_files})),
128
            )
129

130
        except Exception as e:
131
            # If we fail to obtain the storage engine or table object,
132
            # there is no further action to take.
133
            if storage_engine and table_obj:
134
                self._rollback_load(storage_engine, table_obj, do_create)
135
            err_msg = f"Load {self.media_type.name} failed: {str(e)}"
136
            raise ExecutorError(err_msg)
137
        else:
138
            yield Batch(
1✔
139
                pd.DataFrame(
140
                    [
141
                        f"Number of loaded {self.media_type.name}: {str(len(valid_files))}"
142
                    ]
143
                )
144
            )
145

146
    def _rollback_load(
1✔
147
        self,
148
        storage_engine: AbstractStorageEngine,
149
        table_obj: TableCatalogEntry,
150
        do_create: bool,
151
    ):
152
        if do_create:
×
153
            storage_engine.drop(table_obj)
×
154

155
    def _is_media_valid(
1✔
156
        self,
157
        file_path: Path,
158
    ):
159
        file_path = Path(file_path)
1✔
160
        if validate_media(file_path, self.media_type):
1✔
161
            return True
1✔
162
        return False
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc