• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pycasbin / postgresql-watcher / 9858920551

09 Jul 2024 02:26PM UTC coverage: 76.959%. First build
9858920551

Pull #29

github

web-flow
Merge c053715c6 into 4b808d0b7
Pull Request #29: feat: fixed `should_reload` behaviour, close PostgreSQL connections, block until `PostgresqlWatcher` is ready, refactorings

132 of 177 new or added lines in 4 files covered. (74.58%)

167 of 217 relevant lines covered (76.96%)

0.77 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.81
/tests/test_postgresql_watcher.py
1
from logging import DEBUG, getLogger, StreamHandler
1✔
2
from multiprocessing import connection, context
1✔
3
from time import sleep
1✔
4
from unittest import TestCase, main
1✔
5
from unittest.mock import MagicMock
1✔
6
import sys
1✔
7

8
from postgresql_watcher import PostgresqlWatcher
1✔
9
from postgresql_watcher.casbin_channel_subscription import CASBIN_CHANNEL_SELECT_TIMEOUT
1✔
10

11

12
# Warning!!! , Please setup yourself config
13
HOST = "127.0.0.1"
1✔
14
PORT = 5432
1✔
15
USER = "postgres"
1✔
16
PASSWORD = "123456"
1✔
17
DBNAME = "postgres"
1✔
18

19
logger = getLogger()
1✔
20
logger.level = DEBUG
1✔
21
stream_handler = StreamHandler(sys.stdout)
1✔
22
logger.addHandler(stream_handler)
1✔
23

24

25
def get_watcher(channel_name):
1✔
26
    return PostgresqlWatcher(
1✔
27
        host=HOST,
28
        port=PORT,
29
        user=USER,
30
        password=PASSWORD,
31
        dbname=DBNAME,
32
        logger=logger,
33
        channel_name=channel_name,
34
    )
35

36

37
try:
1✔
38
    import _winapi
1✔
39
    from _winapi import WAIT_OBJECT_0, WAIT_ABANDONED_0, WAIT_TIMEOUT, INFINITE
×
40
except ImportError as e:
1✔
41
    if sys.platform == "win32":
1✔
NEW
42
        raise e
×
43
    _winapi = None
1✔
44

45

46
class TestConfig(TestCase):
1✔
47
    def test_pg_watcher_init(self):
1✔
48
        pg_watcher = get_watcher("test_pg_watcher_init")
1✔
49
        if _winapi:
1✔
50
            assert isinstance(pg_watcher.parent_conn, connection.PipeConnection)
×
51
        else:
52
            assert isinstance(pg_watcher.parent_conn, connection.Connection)
1✔
53
        assert isinstance(pg_watcher.subscription_proces, context.Process)
1✔
54

55
    def test_update_single_pg_watcher(self):
1✔
56
        pg_watcher = get_watcher("test_update_single_pg_watcher")
1✔
57
        pg_watcher.update()
1✔
58
        sleep(CASBIN_CHANNEL_SELECT_TIMEOUT * 2)
1✔
59
        self.assertTrue(pg_watcher.should_reload())
1✔
60

61
    def test_no_update_single_pg_watcher(self):
1✔
62
        pg_watcher = get_watcher("test_no_update_single_pg_watcher")
1✔
63
        sleep(CASBIN_CHANNEL_SELECT_TIMEOUT * 2)
1✔
64
        self.assertFalse(pg_watcher.should_reload())
1✔
65

66
    def test_update_mutiple_pg_watcher(self):
1✔
67
        channel_name = "test_update_mutiple_pg_watcher"
1✔
68
        main_watcher = get_watcher(channel_name)
1✔
69

70
        other_watchers = [get_watcher(channel_name) for _ in range(5)]
1✔
71
        main_watcher.update()
1✔
72
        sleep(CASBIN_CHANNEL_SELECT_TIMEOUT * 2)
1✔
73
        for watcher in other_watchers:
1✔
74
            self.assertTrue(watcher.should_reload())
1✔
75

76
    def test_no_update_mutiple_pg_watcher(self):
1✔
77
        channel_name = "test_no_update_mutiple_pg_watcher"
1✔
78
        main_watcher = get_watcher(channel_name)
1✔
79

80
        other_watchers = [get_watcher(channel_name) for _ in range(5)]
1✔
81
        sleep(CASBIN_CHANNEL_SELECT_TIMEOUT * 2)
1✔
82
        for watcher in other_watchers:
1✔
83
            self.assertFalse(watcher.should_reload())
1✔
84
        self.assertFalse(main_watcher.should_reload())
1✔
85

86
    def test_update_handler_called(self):
1✔
87
        channel_name = "test_update_handler_called"
1✔
88
        main_watcher = get_watcher(channel_name)
1✔
89
        handler = MagicMock()
1✔
90
        main_watcher.set_update_callback(handler)
1✔
91
        main_watcher.update()
1✔
92
        sleep(CASBIN_CHANNEL_SELECT_TIMEOUT * 2)
1✔
93
        self.assertTrue(main_watcher.should_reload())
1✔
94
        self.assertTrue(handler.call_count == 1)
1✔
95

96
    def test_update_handler_not_called(self):
1✔
97
        channel_name = "test_update_handler_not_called"
1✔
98
        main_watcher = get_watcher(channel_name)
1✔
99
        handler = MagicMock()
1✔
100
        main_watcher.set_update_callback(handler)
1✔
101
        sleep(CASBIN_CHANNEL_SELECT_TIMEOUT * 2)
1✔
102
        self.assertFalse(main_watcher.should_reload())
1✔
103
        self.assertTrue(handler.call_count == 0)
1✔
104

105

106
if __name__ == "__main__":
1✔
NEW
107
    main()
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc