• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

georgia-tech-db / eva / 2fdc9ac8-ccff-4e89-aebb-f7eb50cc437b

21 Sep 2023 05:23PM UTC coverage: 93.203% (+0.5%) from 92.73%
2fdc9ac8-ccff-4e89-aebb-f7eb50cc437b

push

circle-ci

web-flow
Merge branch 'georgia-tech-db:master' into master

462 of 462 new or added lines in 53 files covered. (100.0%)

11176 of 11991 relevant lines covered (93.2%)

0.93 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

36.84
/evadb/third_party/databases/postgres/postgres_handler.py
1
# coding=utf-8
2
# Copyright 2018-2023 EvaDB
3
#
4
# Licensed under the Apache License, Version 2.0 (the "License");
5
# you may not use this file except in compliance with the License.
6
# You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
import pandas as pd
1✔
16
import psycopg2
1✔
17

18
from evadb.third_party.databases.types import (
1✔
19
    DBHandler,
20
    DBHandlerResponse,
21
    DBHandlerStatus,
22
)
23

24

25
class PostgresHandler(DBHandler):
1✔
26
    def __init__(self, name: str, **kwargs):
1✔
27
        """
28
        Initialize the handler.
29
        Args:
30
            name (str): name of the DB handler instance
31
            **kwargs: arbitrary keyword arguments for establishing the connection.
32
        """
33
        super().__init__(name)
1✔
34
        self.host = kwargs.get("host")
1✔
35
        self.port = kwargs.get("port")
1✔
36
        self.user = kwargs.get("user")
1✔
37
        self.password = kwargs.get("password")
1✔
38
        self.database = kwargs.get("database")
1✔
39
        self.connection = None
1✔
40

41
    def connect(self) -> DBHandlerStatus:
1✔
42
        """
43
        Set up the connection required by the handler.
44
        Returns:
45
            DBHandlerStatus
46
        """
47
        try:
×
48
            self.connection = psycopg2.connect(
×
49
                host=self.host,
50
                port=self.port,
51
                user=self.user,
52
                password=self.password,
53
                database=self.database,
54
            )
55
            self.connection.autocommit = True
×
56
            return DBHandlerStatus(status=True)
×
57
        except psycopg2.Error as e:
58
            return DBHandlerStatus(status=False, error=str(e))
59

60
    def disconnect(self):
1✔
61
        """
62
        Close any existing connections.
63
        """
64
        if self.connection:
×
65
            self.connection.close()
×
66

67
    def get_sqlalchmey_uri(self) -> str:
1✔
68
        return f"postgresql+psycopg2://{self.user}:{self.password}@{self.host}:{self.port}/{self.database}"
×
69

70
    def check_connection(self) -> DBHandlerStatus:
1✔
71
        """
72
        Check connection to the handler.
73
        Returns:
74
            DBHandlerStatus
75
        """
76
        if self.connection:
×
77
            return DBHandlerStatus(status=True)
×
78
        else:
79
            return DBHandlerStatus(status=False, error="Not connected to the database.")
×
80

81
    def get_tables(self) -> DBHandlerResponse:
1✔
82
        """
83
        Return the list of tables in the database.
84
        Returns:
85
            DBHandlerResponse
86
        """
87
        if not self.connection:
×
88
            return DBHandlerResponse(data=None, error="Not connected to the database.")
×
89

90
        try:
×
91
            query = "SELECT table_name FROM information_schema.tables WHERE table_schema NOT IN ('information_schema', 'pg_catalog')"
×
92
            tables_df = pd.read_sql_query(query, self.connection)
×
93
            return DBHandlerResponse(data=tables_df)
×
94
        except psycopg2.Error as e:
95
            return DBHandlerResponse(data=None, error=str(e))
96

97
    def get_columns(self, table_name: str) -> DBHandlerResponse:
1✔
98
        """
99
        Returns the list of columns for the given table.
100
        Args:
101
            table_name (str): name of the table whose columns are to be retrieved.
102
        Returns:
103
            DBHandlerResponse
104
        """
105
        if not self.connection:
×
106
            return DBHandlerResponse(data=None, error="Not connected to the database.")
×
107

108
        try:
×
109
            query = f"SELECT column_name as name, data_type as dtype FROM information_schema.columns WHERE table_name='{table_name}'"
×
110
            columns_df = pd.read_sql_query(query, self.connection)
×
111
            columns_df["dtype"] = columns_df["dtype"].apply(self._pg_to_python_types)
×
112
            return DBHandlerResponse(data=columns_df)
×
113
        except psycopg2.Error as e:
114
            return DBHandlerResponse(data=None, error=str(e))
115

116
    def _fetch_results_as_df(self, cursor):
1✔
117
        """
118
        This is currently the only clean solution that we have found so far.
119
        Reference to Postgres API: https://www.psycopg.org/docs/cursor.html#fetch
120

121
        In short, currently there is no very clean programming way to differentiate
122
        CREATE, INSERT, SELECT. CREATE and INSERT do not return any result, so calling
123
        fetchall() on those will yield a programming error. Cursor has an attribute
124
        rowcount, but it indicates # of rows that are affected. In that case, for both
125
        INSERT and SELECT rowcount is not 0, so we also cannot use this API to
126
        differentiate INSERT and SELECT.
127
        """
128
        try:
×
129
            res = cursor.fetchall()
×
130
            res_df = pd.DataFrame(
×
131
                res, columns=[desc[0].lower() for desc in cursor.description]
132
            )
133
            return res_df
×
134
        except psycopg2.ProgrammingError as e:
135
            if str(e) == "no results to fetch":
136
                return pd.DataFrame({"status": ["success"]})
137
            raise e
138

139
    def execute_native_query(self, query_string: str) -> DBHandlerResponse:
1✔
140
        """
141
        Executes the native query on the database.
142
        Args:
143
            query_string (str): query in native format
144
        Returns:
145
            DBHandlerResponse
146
        """
147
        if not self.connection:
×
148
            return DBHandlerResponse(data=None, error="Not connected to the database.")
×
149

150
        try:
×
151
            cursor = self.connection.cursor()
×
152
            cursor.execute(query_string)
×
153
            return DBHandlerResponse(data=self._fetch_results_as_df(cursor))
×
154
        except psycopg2.Error as e:
155
            return DBHandlerResponse(data=None, error=str(e))
156

157
    def _pg_to_python_types(self, pg_type: str):
1✔
158
        mapping = {
×
159
            "integer": int,
160
            "bigint": int,
161
            "smallint": int,
162
            "numeric": float,
163
            "real": float,
164
            "double precision": float,
165
            "character": str,
166
            "character varying": str,
167
            "text": str,
168
            "boolean": bool,
169
            # Add more mappings as needed
170
        }
171

172
        if pg_type in mapping:
×
173
            return mapping[pg_type]
×
174
        else:
175
            raise Exception(
176
                f"Unsupported column {pg_type} encountered in the postgres table. Please raise a feature request!"
177
            )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc