• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pycasbin / postgresql-watcher / 9852615207

09 Jul 2024 07:12AM UTC coverage: 73.148%. First build
9852615207

Pull #27

github

web-flow
Merge 953b68695 into 4b808d0b7
Pull Request #27: feat: calling update_callback fun if set and updated docs

0 of 3 new or added lines in 1 file covered. (0.0%)

79 of 108 relevant lines covered (73.15%)

0.73 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

64.29
/postgresql_watcher/watcher.py
1
from typing import Optional, Callable
1✔
2
from psycopg2 import connect, extensions
1✔
3
from multiprocessing import Process, Pipe
1✔
4
import time
1✔
5
from select import select
1✔
6
from logging import Logger, getLogger
1✔
7

8

9
POSTGRESQL_CHANNEL_NAME = "casbin_role_watcher"
1✔
10

11

12
def casbin_subscription(
1✔
13
    process_conn: Pipe,
14
    logger: Logger,
15
    host: str,
16
    user: str,
17
    password: str,
18
    port: Optional[int] = 5432,
19
    dbname: Optional[str] = "postgres",
20
    delay: Optional[int] = 2,
21
    channel_name: Optional[str] = POSTGRESQL_CHANNEL_NAME,
22
    sslmode: Optional[str] = None,
23
    sslrootcert: Optional[str] = None,
24
    sslcert: Optional[str] = None,
25
    sslkey: Optional[str] = None,
26
):
27
    # delay connecting to postgresql (postgresql connection failure)
28
    time.sleep(delay)
×
29
    conn = connect(
×
30
        host=host,
31
        port=port,
32
        user=user,
33
        password=password,
34
        dbname=dbname,
35
        sslmode=sslmode,
36
        sslrootcert=sslrootcert,
37
        sslcert=sslcert,
38
        sslkey=sslkey
39
    )
40
    # Can only receive notifications when not in transaction, set this for easier usage
41
    conn.set_isolation_level(extensions.ISOLATION_LEVEL_AUTOCOMMIT)
×
42
    curs = conn.cursor()
×
43
    curs.execute(f"LISTEN {channel_name};")
×
44
    logger.debug("Waiting for casbin policy update")
×
45
    while True and not curs.closed:
×
46
        if not select([conn], [], [], 5) == ([], [], []):
×
47
            logger.debug("Casbin policy update identified..")
×
48
            conn.poll()
×
49
            while conn.notifies:
×
50
                notify = conn.notifies.pop(0)
×
51
                logger.debug(f"Notify: {notify.payload}")
×
52
                process_conn.send(notify.payload)
×
53

54

55
class PostgresqlWatcher(object):
1✔
56
    def __init__(
1✔
57
        self,
58
        host: str,
59
        user: str,
60
        password: str,
61
        port: Optional[int] = 5432,
62
        dbname: Optional[str] = "postgres",
63
        channel_name: Optional[str] = POSTGRESQL_CHANNEL_NAME,
64
        start_process: Optional[bool] = True,
65
        sslmode: Optional[str] = None,
66
        sslrootcert: Optional[str] = None,
67
        sslcert: Optional[str] = None,
68
        sslkey: Optional[str] = None,
69
        logger: Optional[Logger] = None,
70
    ):
71
        self.update_callback = None
1✔
72
        self.parent_conn = None
1✔
73
        self.host = host
1✔
74
        self.port = port
1✔
75
        self.user = user
1✔
76
        self.password = password
1✔
77
        self.dbname = dbname
1✔
78
        self.channel_name = channel_name
1✔
79
        self.sslmode = sslmode
1✔
80
        self.sslrootcert = sslrootcert
1✔
81
        self.sslcert = sslcert
1✔
82
        self.sslkey = sslkey
1✔
83
        if logger is None:
1✔
84
            logger = getLogger()
1✔
85
        self.logger = logger
1✔
86
        self.subscribed_process = self.create_subscriber_process(start_process)
1✔
87

88
    def create_subscriber_process(
1✔
89
        self,
90
        start_process: Optional[bool] = True,
91
        delay: Optional[int] = 2,
92
    ):
93
        parent_conn, child_conn = Pipe()
1✔
94
        if not self.parent_conn:
1✔
95
            self.parent_conn = parent_conn
1✔
96
        p = Process(
1✔
97
            target=casbin_subscription,
98
            args=(
99
                child_conn,
100
                self.logger,
101
                self.host,
102
                self.user,
103
                self.password,
104
                self.port,
105
                self.dbname,
106
                delay,
107
                self.channel_name,
108
                self.sslmode,
109
                self.sslrootcert,
110
                self.sslcert,
111
                self.sslkey,
112
            ),
113
            daemon=True,
114
        )
115
        if start_process:
1✔
116
            p.start()
1✔
117
        return p
1✔
118

119
    def set_update_callback(self, fn_name: Callable):
1✔
120
        self.logger.debug(f"runtime is set update callback {fn_name}")
1✔
121
        self.update_callback = fn_name
1✔
122

123
    def update(self):
1✔
124
        conn = connect(
1✔
125
            host=self.host,
126
            port=self.port,
127
            user=self.user,
128
            password=self.password,
129
            dbname=self.dbname,
130
            sslmode=self.sslmode,
131
            sslrootcert=self.sslrootcert,
132
            sslcert=self.sslcert,
133
            sslkey=self.sslkey
134
        )
135
        # Can only receive notifications when not in transaction, set this for easier usage
136
        conn.set_isolation_level(extensions.ISOLATION_LEVEL_AUTOCOMMIT)
1✔
137
        curs = conn.cursor()
1✔
138
        curs.execute(
1✔
139
            f"NOTIFY {self.channel_name},'casbin policy update at {time.time()}'"
140
        )
141
        conn.close()
1✔
142
        return True
1✔
143

144
    def should_reload(self):
1✔
145
        try:
×
NEW
146
            if self.parent_conn.poll():
×
147
                message = self.parent_conn.recv()
×
148
                self.logger.debug(f"message:{message}")
×
NEW
149
                if self.update_callback:
×
NEW
150
                    self.update_callback()
×
151
                return True
×
152
        except EOFError:
×
153
            self.logger.warning(
×
154
                "Child casbin-watcher subscribe process has stopped, "
155
                "attempting to recreate the process in 10 seconds..."
156
            )
157
            self.subscribed_process, self.parent_conn = self.create_subscriber_process(
×
158
                delay=10
159
            )
160
            return False
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc