• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

fiduswriter / fiduswriter / 10941406873

19 Sep 2024 12:55PM UTC coverage: 87.088% (-0.007%) from 87.095%
10941406873

Pull #1294

github

web-flow
Merge 0ca2fd0ed into b2c563a85
Pull Request #1294: remove JSONPATCH setting

6374 of 7319 relevant lines covered (87.09%)

4.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.67
fiduswriter/base/consumers.py
1
import json
13✔
2
import atexit
13✔
3

4
from base.models import Presence
13✔
5
from base.base_consumer import BaseWebsocketConsumer
13✔
6

7

8
class SystemMessageConsumer(BaseWebsocketConsumer):
13✔
9
    clients = []
13✔
10

11
    def connect(self):
13✔
12
        if not super().connect():
13✔
13
            return False
×
14
        SystemMessageConsumer.clients.append(self)
13✔
15
        self.presence = False
13✔
16
        self.headers = dict(self.scope["headers"])
13✔
17
        user_agent = self.headers.get(b"user-agent", b"").decode("utf-8")
13✔
18
        if user_agent != "Fidus Writer":
13✔
19
            self.update_presence(True)
13✔
20

21
    def disconnect(self, close_code):
13✔
22
        self.update_presence(False)
11✔
23
        SystemMessageConsumer.clients.remove(self)
11✔
24
        self.close()
11✔
25

26
    def update_presence(self, is_connected):
13✔
27
        if is_connected:
13✔
28
            if self.presence:
13✔
29
                self.presence.save()
2✔
30
            else:
31
                host = self.headers.get(b"host", b"").decode("utf-8")
13✔
32
                origin = self.headers.get(b"origin", b"").decode("utf-8")
13✔
33
                protocol = "wss://" if origin.startswith("https") else "ws://"
13✔
34
                self.presence = Presence.objects.create(
13✔
35
                    user=self.user,
36
                    server_url=protocol + host + self.scope["path"],
37
                )
38
        else:
39
            if self.presence:
11✔
40
                self.presence.delete()
11✔
41

42
    def receive(self, text_data=None):
13✔
43
        if not text_data:
13✔
44
            return
×
45
        data = json.loads(text_data)
13✔
46
        if data["type"] == "ping":
13✔
47
            self.update_presence(True)
2✔
48
        elif data["type"] == "system_message":
13✔
49
            for client in SystemMessageConsumer.clients:
1✔
50
                client.send_message(data)
1✔
51
        return super().receive(text_data)
13✔
52

53

54
def remove_all_presences():
13✔
55
    # Removing all presences connected to this server.
56
    for client in SystemMessageConsumer.clients:
×
57
        if client.presence:
×
58
            client.presence.delete()
×
59
    SystemMessageConsumer.clients = []
×
60

61

62
atexit.register(remove_all_presences)
13✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc