• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pycasbin / postgresql-watcher / 9858920551

09 Jul 2024 02:26PM UTC coverage: 76.959%. First build
9858920551

Pull #29

github

web-flow
Merge c053715c6 into 4b808d0b7
Pull Request #29: feat: fixed `should_reload` behaviour, close PostgreSQL connections, block until `PostgresqlWatcher` is ready, refactorings

132 of 177 new or added lines in 4 files covered. (74.58%)

167 of 217 relevant lines covered (76.96%)

0.77 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

32.73
/postgresql_watcher/casbin_channel_subscription.py
1
from enum import IntEnum
1✔
2
from logging import Logger
1✔
3
from multiprocessing.connection import Connection
1✔
4
from select import select
1✔
5
from signal import signal, SIGINT, SIGTERM
1✔
6
from time import sleep
1✔
7
from typing import Optional
1✔
8

9
from psycopg2 import connect, extensions, InterfaceError
1✔
10

11

12
CASBIN_CHANNEL_SELECT_TIMEOUT = 1  # seconds
1✔
13

14

15
def casbin_channel_subscription(
1✔
16
    process_conn: Connection,
17
    logger: Logger,
18
    host: str,
19
    user: str,
20
    password: str,
21
    channel_name: str,
22
    port: int = 5432,
23
    dbname: str = "postgres",
24
    delay: int = 2,
25
    sslmode: Optional[str] = None,
26
    sslrootcert: Optional[str] = None,
27
    sslcert: Optional[str] = None,
28
    sslkey: Optional[str] = None,
29
):
30
    # delay connecting to postgresql (postgresql connection failure)
NEW
31
    sleep(delay)
×
NEW
32
    db_connection = connect(
×
33
        host=host,
34
        port=port,
35
        user=user,
36
        password=password,
37
        dbname=dbname,
38
        sslmode=sslmode,
39
        sslrootcert=sslrootcert,
40
        sslcert=sslcert,
41
        sslkey=sslkey,
42
    )
43
    # Can only receive notifications when not in transaction, set this for easier usage
NEW
44
    db_connection.set_isolation_level(extensions.ISOLATION_LEVEL_AUTOCOMMIT)
×
NEW
45
    db_cursor = db_connection.cursor()
×
NEW
46
    context_manager = _ConnectionManager(db_connection, db_cursor)
×
47

NEW
48
    with context_manager:
×
NEW
49
        db_cursor.execute(f"LISTEN {channel_name};")
×
NEW
50
        logger.debug("Waiting for casbin policy update")
×
NEW
51
        process_conn.send(_ChannelSubscriptionMessage.IS_READY)
×
52

NEW
53
        while not db_cursor.closed:
×
NEW
54
            try:
×
NEW
55
                select_result = select(
×
56
                    [db_connection],
57
                    [],
58
                    [],
59
                    CASBIN_CHANNEL_SELECT_TIMEOUT,
60
                )
NEW
61
                if select_result != ([], [], []):
×
NEW
62
                    logger.debug("Casbin policy update identified")
×
NEW
63
                    db_connection.poll()
×
NEW
64
                    while db_connection.notifies:
×
NEW
65
                        notify = db_connection.notifies.pop(0)
×
NEW
66
                        logger.debug(f"Notify: {notify.payload}")
×
NEW
67
                        process_conn.send(_ChannelSubscriptionMessage.RECEIVED_UPDATE)
×
NEW
68
            except (InterfaceError, OSError) as e:
×
69
                # Log an exception if these errors occurred without the context beeing closed
NEW
70
                if not context_manager.connections_were_closed:
×
NEW
71
                    logger.critical(e, exc_info=True)
×
NEW
72
                break
×
73

74

75
class _ChannelSubscriptionMessage(IntEnum):
1✔
76
    IS_READY = 1
1✔
77
    RECEIVED_UPDATE = 2
1✔
78

79

80
class _ConnectionManager:
1✔
81
    """
82
    You can not use 'with' and a connection / cursor directly in this setup.
83
    For more details see this issue: https://github.com/psycopg/psycopg2/issues/941#issuecomment-864025101.
84
    As a workaround this connection manager / context manager class is used, that also handles SIGINT and SIGTERM and
85
    closes the database connection.
86
    """
87

88
    def __init__(self, connection, cursor) -> None:
1✔
NEW
89
        self.connection = connection
×
NEW
90
        self.cursor = cursor
×
NEW
91
        self.connections_were_closed = False
×
92

93
    def __enter__(self):
1✔
NEW
94
        signal(SIGINT, self._close_connections)
×
NEW
95
        signal(SIGTERM, self._close_connections)
×
NEW
96
        return self
×
97

98
    def _close_connections(self, *_):
1✔
NEW
99
        if self.cursor is not None:
×
NEW
100
            self.cursor.close()
×
NEW
101
            self.cursor = None
×
NEW
102
        if self.connection is not None:
×
NEW
103
            self.connection.close()
×
NEW
104
            self.connection = None
×
NEW
105
        self.connections_were_closed = True
×
106

107
    def __exit__(self, *_):
1✔
NEW
108
        self._close_connections()
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc