• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

georgia-tech-db / eva / #754

04 Sep 2023 09:54PM UTC coverage: 74.807% (-5.5%) from 80.336%
#754

push

circle-ci

jiashenC
update case

8727 of 11666 relevant lines covered (74.81%)

0.75 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

43.75
/evadb/executor/load_csv_executor.py
1
# coding=utf-8
2
# Copyright 2018-2023 EvaDB
3
#
4
# Licensed under the Apache License, Version 2.0 (the "License");
5
# you may not use this file except in compliance with the License.
6
# You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
import pandas as pd
1✔
16

17
from evadb.database import EvaDBDatabase
1✔
18
from evadb.executor.abstract_executor import AbstractExecutor
1✔
19
from evadb.executor.executor_utils import ExecutorError
1✔
20
from evadb.expression.tuple_value_expression import TupleValueExpression
1✔
21
from evadb.models.storage.batch import Batch
1✔
22
from evadb.plan_nodes.load_data_plan import LoadDataPlan
1✔
23
from evadb.readers.csv_reader import CSVReader
1✔
24
from evadb.storage.storage_engine import StorageEngine
1✔
25
from evadb.utils.logging_manager import logger
1✔
26

27

28
class LoadCSVExecutor(AbstractExecutor):
1✔
29
    def __init__(self, db: EvaDBDatabase, node: LoadDataPlan):
1✔
30
        super().__init__(db, node)
1✔
31

32
    def exec(self, *args, **kwargs):
1✔
33
        """
34
        Read the input csv file using pandas and persist data
35
        using storage engine
36
        """
37

38
        # Check table existence
39
        table_info = self.node.table_info
×
40
        database_name = table_info.database_name
×
41
        table_name = table_info.table_name
×
42
        table_obj = self.catalog().get_table_catalog_entry(
×
43
            table_name,
44
            database_name,
45
        )
46
        if table_obj is None:
×
47
            error = f"{table_name} does not exist."
×
48
            logger.error(error)
×
49
            raise ExecutorError(error)
50

51
        # Get the column information
52
        column_list = []
×
53
        for column in table_obj.columns:
×
54
            column_list.append(
×
55
                TupleValueExpression(
56
                    name=column.name,
57
                    table_alias=table_obj.name.lower(),
58
                    col_object=column,
59
                )
60
            )
61

62
        # Read the CSV file
63
        # converters is a dictionary of functions that convert the values
64
        # in the column to the desired type
65
        csv_reader = CSVReader(
×
66
            self.node.file_path,
67
            column_list=column_list,
68
            batch_mem_size=self.node.batch_mem_size,
69
        )
70

71
        storage_engine = StorageEngine.factory(self.db, table_obj)
×
72
        # write with storage engine in batches
73
        num_loaded_frames = 0
×
74
        for batch in csv_reader.read():
×
75
            storage_engine.write(table_obj, batch)
×
76
            num_loaded_frames += len(batch)
×
77

78
        # yield result
79
        df_yield_result = Batch(
×
80
            pd.DataFrame(
81
                {
82
                    "CSV": str(self.node.file_path),
83
                    "Number of loaded frames": num_loaded_frames,
84
                },
85
                index=[0],
86
            )
87
        )
88

89
        yield df_yield_result
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc