• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

georgia-tech-db / eva / d65c6213-51c3-43ff-bbf7-2f70c812864d

pending completion
d65c6213-51c3-43ff-bbf7-2f70c812864d

Pull #551

circle-ci

jarulraj
fixes
Pull Request #551: feat: add support for aggregates and toxicity classification

88 of 88 new or added lines in 10 files covered. (100.0%)

8210 of 8736 relevant lines covered (93.98%)

0.91 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.07
/eva/executor/load_multimedia_executor.py
1
# coding=utf-8
2
# Copyright 2018-2022 EVA
3
#
4
# Licensed under the Apache License, Version 2.0 (the "License");
5
# you may not use this file except in compliance with the License.
6
# You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
from pathlib import Path
1✔
16

17
import pandas as pd
1✔
18

19
from eva.catalog.catalog_manager import CatalogManager
1✔
20
from eva.catalog.models.table_catalog import TableCatalog
1✔
21
from eva.executor.abstract_executor import AbstractExecutor
1✔
22
from eva.executor.executor_utils import ExecutorError, iter_path_regex, validate_media
1✔
23
from eva.models.storage.batch import Batch
1✔
24
from eva.plan_nodes.load_data_plan import LoadDataPlan
1✔
25
from eva.storage.abstract_storage_engine import AbstractStorageEngine
1✔
26
from eva.storage.storage_engine import StorageEngine
1✔
27
from eva.utils.logging_manager import logger
1✔
28

29

30
class LoadMultimediaExecutor(AbstractExecutor):
1✔
31
    def __init__(self, node: LoadDataPlan):
1✔
32
        super().__init__(node)
1✔
33
        self.catalog = CatalogManager()
1✔
34
        self.media_type = self.node.file_options["file_format"]
1✔
35

36
    def validate(self):
1✔
37
        pass
×
38

39
    def exec(self):
1✔
40
        try:
1✔
41
            valid_files = []
1✔
42
            for file_path in iter_path_regex(self.node.file_path):
1✔
43
                file_path = Path(file_path)
1✔
44
                if validate_media(file_path, self.media_type):
1✔
45
                    valid_files.append(str(file_path))
1✔
46
                else:
47
                    err_msg = f"Load {self.media_type.name} failed due to invalid file {str(file_path)}"
1✔
48
                    logger.error(err_msg)
1✔
49
                    raise ValueError(file_path)
1✔
50
            # Create catalog entry
51
            table_info = self.node.table_info
1✔
52
            database_name = table_info.database_name
1✔
53
            table_name = table_info.table_name
1✔
54
            # Sanity check to make sure there is no existing table with same name
55
            do_create = False
1✔
56
            table_obj = self.catalog.get_table_catalog_entry(table_name, database_name)
1✔
57
            if table_obj:
1✔
58
                msg = f"Adding to an existing table {table_name}."
1✔
59
                logger.info(msg)
1✔
60
            # Create the catalog entry
61
            else:
62
                table_obj = (
1✔
63
                    self.catalog.create_and_insert_multimedia_table_catalog_entry(
64
                        table_name, self.media_type
65
                    )
66
                )
67
                do_create = True
1✔
68

69
            storage_engine = StorageEngine.factory(table_obj)
1✔
70
            if do_create:
1✔
71
                success = storage_engine.create(table_obj)
1✔
72
                if not success:
1✔
73
                    raise ExecutorError(
×
74
                        f"StorageEngine {storage_engine} create call failed"
75
                    )
76
            storage_engine.write(
1✔
77
                table_obj,
78
                Batch(pd.DataFrame({"file_path": valid_files})),
79
            )
80

81
        except Exception as e:
1✔
82
            self._rollback_load(storage_engine, table_obj, do_create)
1✔
83
            err_msg = f"Load {self.media_type.name} failed: encountered unexpected error {str(e)}"
1✔
84
            logger.error(err_msg)
1✔
85
            raise ExecutorError(err_msg)
1✔
86
        else:
87
            yield Batch(
1✔
88
                pd.DataFrame(
89
                    [
90
                        f"Number of loaded {self.media_type.name}: {str(len(valid_files))}"
91
                    ]
92
                )
93
            )
94

95
    def _rollback_load(
1✔
96
        self,
97
        storage_engine: AbstractStorageEngine,
98
        table_obj: TableCatalog,
99
        do_create: bool,
100
    ):
101
        try:
1✔
102
            if do_create:
1✔
103
                storage_engine.drop(table_obj)
×
104
        except Exception as e:
×
105
            logger.exception(
×
106
                f"Unexpected Exception {e} occured while rolling back. This is bad as the {self.media_type.name} table can be in a corrupt state. Please verify the table {table_obj} for correctness."
107
            )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc