• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

informatics-isi-edu / deriva-groups / 16839615317

08 Aug 2025 08:12PM UTC coverage: 79.094% (-4.2%) from 83.306%
16839615317

Pull #1

github

web-flow
Merge d0c1cdb7e into f4d191ead
Pull Request #1: postgresql storage backend

0 of 96 new or added lines in 1 file covered. (0.0%)

1502 of 1899 relevant lines covered (79.09%)

0.79 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/deriva/web/groups/api/storage/backends/postgresql.py
1
#
2
# Copyright 2025 University of Southern California
3
#
4
# Licensed under the Apache License, Version 2.0 (the "License");
5
# you may not use this file except in compliance with the License.
6
# You may obtain a copy of the License at
7
#
8
#    http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
#
NEW
16
import os
×
NEW
17
import psycopg2
×
NEW
18
import psycopg2.pool
×
NEW
19
import logging
×
NEW
20
import time
×
NEW
21
import fnmatch
×
NEW
22
from typing import Optional, List, Iterable, Union
×
23

NEW
24
logger = logging.getLogger(__name__)
×
25

NEW
26
class connection (psycopg2.extensions.connection):
×
27
    """Customized pyscopg2 connection factory
28

29
    Does idempotent schema initialization and prepares statements for reuse.
30
    """
NEW
31
    def __init__(self, dsn):
×
NEW
32
        psycopg2.extensions.connection.__init__(self, dsn)
×
NEW
33
        logger.debug(f"Initializing new connection for PostgreSQL: dsn={self.dsn}")
×
NEW
34
        with self.cursor() as cur:
×
NEW
35
            self._idempotent_ddl(cur)
×
NEW
36
            self._prepare_stmts(cur)
×
NEW
37
            self.commit()
×
NEW
38
        logger.debug(f"Initialization complete")
×
39

NEW
40
    def _idempotent_ddl(self, cur):
×
NEW
41
        cur.execute("""
×
42
        CREATE TABLE IF NOT EXISTS deriva_groups (
43
          key text PRIMARY KEY,
44
          value bytea NOT NULL,
45
          expires_at float8
46
        );
47
        """)
48

NEW
49
    def _prepare_stmts(self, cur):
×
NEW
50
        cur.execute("""
×
51
        DEALLOCATE PREPARE ALL;
52

53
        PREPARE deriva_groups_session_set(text, bytea, float8) AS
54
        INSERT INTO deriva_groups (key, value, expires_at)
55
        VALUES($1, $2, $3)
56
        ON CONFLICT (key)
57
        DO UPDATE SET value = EXCLUDED.value, expires_at = EXCLUDED.expires_at;
58

59
        PREPARE deriva_groups_session_get(text) AS
60
        SELECT value, expires_at FROM deriva_groups WHERE key = $1;
61

62
        PREPARE deriva_groups_session_get_expires(text) AS
63
        SELECT expires_at FROM deriva_groups WHERE key = $1;
64

65
        PREPARE deriva_groups_session_list AS
66
        SELECT key, expires_at FROM deriva_groups;
67

68
        PREPARE deriva_groups_session_delete(text) AS
69
        DELETE FROM deriva_groups WHERE key = $1;
70
        """)
71

NEW
72
class PostgreSQLBackend:
×
73
    """
74
    A simple PostgreSQL-based key-value store with TTL support and pooled psycopg2 connections.
75
    """
NEW
76
    def __init__(self, url: str = "postgresql:///derivagrps", idle_timeout: int = 60):
×
77
        # TODO: figure out what idle_timeout would even mean here with pooling??
78
        # TODO: add configuration for minconn, maxconn here?
NEW
79
        self.dsn = url
×
NEW
80
        minconn = 1 # need to keep an idle connection open to really benefit from pool?
×
NEW
81
        maxconn = 4
×
NEW
82
        self.pool = psycopg2.pool.ThreadedConnectionPool(minconn, maxconn, dsn=url, connection_factory=connection)
×
NEW
83
        logger.debug(f"Using threaded connection pool for PostgreSQL: minconn={minconn} maxconn={maxconn} url={self.dsn}")
×
NEW
84
        self.idle_timeout = idle_timeout
×
85

NEW
86
    def _get_conn(self):
×
NEW
87
        conn = self.pool.getconn()
×
NEW
88
        logger.debug(f"Got pooled connection dsn={conn.dsn} status={conn.status}")
×
NEW
89
        conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_REPEATABLE_READ)
×
NEW
90
        return conn
×
91

NEW
92
    def _put_conn(self, conn):
×
NEW
93
        if conn is not None:
×
NEW
94
            logger.debug(f"Returning connection to pool dsn={conn.dsn} status={conn.status}")
×
NEW
95
            self.pool.putconn(conn)
×
96

NEW
97
    def close(self):
×
98
        """
99
        Close the backend and clear resources.
100
        """
NEW
101
        if self.pool is not None:
×
NEW
102
            logger.debug(f"Shutting down connection pool for dsn={self.dsn}")
×
NEW
103
            pool = self.pool
×
NEW
104
            self.pool = None
×
NEW
105
            pool.closeall()
×
NEW
106
            del pool
×
107

NEW
108
    def _pooled_execute_stmt(self, sql, params, resultfunc=lambda cur: None):
×
109
        """Execute and commit one statement on a pooled connection, returning result of resultfunc applied to cursor.
110
        """
NEW
111
        conn = self._get_conn()
×
NEW
112
        with conn.cursor() as cur:
×
NEW
113
            cur.execute(sql, params)
×
NEW
114
            result = resultfunc(cur)
×
NEW
115
            conn.commit()
×
NEW
116
        self._put_conn(conn)
×
NEW
117
        return result
×
118

NEW
119
    def setex(self, key: str, value: Union[str, bytes], ttl: int) -> None:
×
NEW
120
        expires_at = time.time() + ttl
×
NEW
121
        blob = value if isinstance(value, (bytes, bytearray)) else value.encode()
×
NEW
122
        self._pooled_execute_stmt(
×
123
            "EXECUTE deriva_groups_session_set(%s, %s, %s);",
124
            (key, blob, expires_at)
125
        )
126

NEW
127
    def get(self, key: str) -> Optional[bytes]:
×
NEW
128
        row = self._pooled_execute_stmt(
×
129
            "EXECUTE deriva_groups_session_get(%s);",
130
            (key,),
131
            lambda cur: cur.fetchone()
132
        )
NEW
133
        if not row:
×
NEW
134
            return None
×
NEW
135
        value, expires_at = row
×
NEW
136
        if expires_at is not None and time.time() >= expires_at:
×
NEW
137
            self.delete(key)
×
NEW
138
            return None
×
NEW
139
        return value.tobytes()
×
140

NEW
141
    def delete(self, key: str) -> None:
×
NEW
142
        self._pooled_execute_stmt(
×
143
            "EXECUTE deriva_groups_session_delete(%s);",
144
            (key,)
145
        )
146

NEW
147
    def keys(self, pattern: str) -> List[str]:
×
NEW
148
        rows = self._pooled_execute_stmt(
×
149
            "EXECUTE deriva_groups_session_list;",
150
            None,
151
            lambda cur: list(cur)
152
        )
NEW
153
        now = time.time()
×
NEW
154
        result = []
×
NEW
155
        for key, expires_at in rows:
×
NEW
156
            if expires_at is not None and now >= expires_at:
×
NEW
157
                self.delete(key)
×
NEW
158
                continue
×
NEW
159
            if fnmatch.fnmatch(key, pattern):
×
NEW
160
                result.append(key)
×
161
        # after for loop...
NEW
162
        return result
×
163

NEW
164
    def scan_iter(self, pattern: str) -> Iterable[str]:
×
NEW
165
        for key in self.keys(pattern):
×
NEW
166
            yield key
×
167

NEW
168
    def exists(self, key: str) -> bool:
×
NEW
169
        return self.get(key) is not None
×
170

NEW
171
    def ttl(self, key: str) -> int:
×
NEW
172
        row = self._pooled_execute_stmt(
×
173
            "EXECUTE deriva_groups_session_get_expires(%s);",
174
            (key,),
175
            lambda cur: cur.fetchone()
176
        )
NEW
177
        if not row:
×
NEW
178
            return -2  # key does not exist
×
NEW
179
        expires_at, = row
×
NEW
180
        if expires_at is None:
×
NEW
181
            return -1  # no TTL set
×
NEW
182
        remaining = int(expires_at - time.time())
×
NEW
183
        return remaining if remaining >= 0 else -2
×
184

NEW
185
    def set(self, key: str, value: Union[str, bytes]) -> None:
×
NEW
186
        blob = value if isinstance(value, (bytes, bytearray)) else value.encode()
×
NEW
187
        row = self._pooled_execute_stmt(
×
188
            "EXECUTE deriva_groups_session_set(%s, %s, %s);",
189
            (key, blob, None)
190
        )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc