• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

georgia-tech-db / eva / bf991521-4cdb-4552-b271-e6234b781c4c

31 Aug 2023 11:50PM UTC coverage: 79.906% (-2.0%) from 81.924%
bf991521-4cdb-4552-b271-e6234b781c4c

push

circle-ci

Ankith Reddy Chitti
sqlite handler

51 of 51 new or added lines in 4 files covered. (100.0%)

8987 of 11247 relevant lines covered (79.91%)

1.32 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/evadb/third_party/databases/postgres/postgres_handler.py
1
# coding=utf-8
2
# Copyright 2018-2023 EvaDB
3
#
4
# Licensed under the Apache License, Version 2.0 (the "License");
5
# you may not use this file except in compliance with the License.
6
# You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
import pandas as pd
×
16
import psycopg2
×
17

18
from evadb.third_party.databases.types import (
×
19
    DBHandler,
20
    DBHandlerResponse,
21
    DBHandlerStatus,
22
)
23

24

25
class PostgresHandler(DBHandler):
×
26
    def __init__(self, name: str, **kwargs):
×
27
        """
28
        Initialize the handler.
29
        Args:
30
            name (str): name of the DB handler instance
31
            **kwargs: arbitrary keyword arguments for establishing the connection.
32
        """
33
        super().__init__(name)
×
34
        self.host = kwargs.get("host")
×
35
        self.port = kwargs.get("port")
×
36
        self.user = kwargs.get("user")
×
37
        self.password = kwargs.get("password")
×
38
        self.database = kwargs.get("database")
×
39
        self.connection = None
×
40

41
    def connect(self) -> DBHandlerStatus:
×
42
        """
43
        Set up the connection required by the handler.
44
        Returns:
45
            DBHandlerStatus
46
        """
47
        try:
×
48
            self.connection = psycopg2.connect(
×
49
                host=self.host,
50
                port=self.port,
51
                user=self.user,
52
                password=self.password,
53
                database=self.database,
54
            )
55
            self.connection.autocommit = True
×
56
            return DBHandlerStatus(status=True)
×
57
        except psycopg2.Error as e:
58
            return DBHandlerStatus(status=False, error=str(e))
59

60
    def disconnect(self):
×
61
        """
62
        Close any existing connections.
63
        """
64
        if self.connection:
×
65
            self.connection.close()
×
66

67
    def check_connection(self) -> DBHandlerStatus:
×
68
        """
69
        Check connection to the handler.
70
        Returns:
71
            DBHandlerStatus
72
        """
73
        if self.connection:
×
74
            return DBHandlerStatus(status=True)
×
75
        else:
76
            return DBHandlerStatus(status=False, error="Not connected to the database.")
×
77

78
    def get_tables(self) -> DBHandlerResponse:
×
79
        """
80
        Return the list of tables in the database.
81
        Returns:
82
            DBHandlerResponse
83
        """
84
        if not self.connection:
×
85
            return DBHandlerResponse(data=None, error="Not connected to the database.")
×
86

87
        try:
×
88
            query = "SELECT table_name FROM information_schema.tables WHERE table_schema NOT IN ('information_schema', 'pg_catalog')"
×
89
            tables_df = pd.read_sql_query(query, self.connection)
×
90
            return DBHandlerResponse(data=tables_df)
×
91
        except psycopg2.Error as e:
92
            return DBHandlerResponse(data=None, error=str(e))
93

94
    def get_columns(self, table_name: str) -> DBHandlerResponse:
×
95
        """
96
        Returns the list of columns for the given table.
97
        Args:
98
            table_name (str): name of the table whose columns are to be retrieved.
99
        Returns:
100
            DBHandlerResponse
101
        """
102
        if not self.connection:
×
103
            return DBHandlerResponse(data=None, error="Not connected to the database.")
×
104

105
        try:
×
106
            query = f"SELECT column_name as name, data_type as dtype FROM information_schema.columns WHERE table_name='{table_name}'"
×
107
            columns_df = pd.read_sql_query(query, self.connection)
×
108
            columns_df["dtype"] = columns_df["dtype"].apply(self._pg_to_python_types)
×
109
            return DBHandlerResponse(data=columns_df)
×
110
        except psycopg2.Error as e:
111
            return DBHandlerResponse(data=None, error=str(e))
112

113
    def _fetch_results_as_df(self, cursor):
×
114
        """
115
        This is currently the only clean solution that we have found so far.
116
        Reference to Postgres API: https://www.psycopg.org/docs/cursor.html#fetch
117

118
        In short, currently there is no very clean programming way to differentiate
119
        CREATE, INSERT, SELECT. CREATE and INSERT do not return any result, so calling
120
        fetchall() on those will yield a programming error. Cursor has an attribute
121
        rowcount, but it indicates # of rows that are affected. In that case, for both
122
        INSERT and SELECT rowcount is not 0, so we also cannot use this API to
123
        differentiate INSERT and SELECT.
124
        """
125
        try:
×
126
            res = cursor.fetchall()
×
127
            res_df = pd.DataFrame(res, columns=[desc[0] for desc in cursor.description])
×
128
            return res_df
×
129
        except psycopg2.ProgrammingError as e:
130
            if str(e) == "no results to fetch":
131
                return pd.DataFrame({"status": ["success"]})
132
            raise e
133

134
    def execute_native_query(self, query_string: str) -> DBHandlerResponse:
×
135
        """
136
        Executes the native query on the database.
137
        Args:
138
            query_string (str): query in native format
139
        Returns:
140
            DBHandlerResponse
141
        """
142
        if not self.connection:
×
143
            return DBHandlerResponse(data=None, error="Not connected to the database.")
×
144

145
        try:
×
146
            cursor = self.connection.cursor()
×
147
            cursor.execute(query_string)
×
148
            return DBHandlerResponse(data=self._fetch_results_as_df(cursor))
×
149
        except psycopg2.Error as e:
150
            return DBHandlerResponse(data=None, error=str(e))
151

152
    def _pg_to_python_types(self, pg_type: str):
×
153
        mapping = {
×
154
            "integer": int,
155
            "bigint": int,
156
            "smallint": int,
157
            "numeric": float,
158
            "real": float,
159
            "double precision": float,
160
            "character": str,
161
            "character varying": str,
162
            "text": str,
163
            "boolean": bool,
164
            # Add more mappings as needed
165
        }
166

167
        if pg_type in mapping:
×
168
            return mapping[pg_type]
×
169
        else:
170
            raise Exception(
171
                f"Unsupported column {pg_type} encountered in the postgres table. Please raise a feature request!"
172
            )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc