• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

javipalanca / spade / 11780874355

11 Nov 2024 02:54PM UTC coverage: 95.463% (+14.0%) from 81.429%
11780874355

push

github

1031 of 1080 relevant lines covered (95.46%)

2.86 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.5
/spade/presence.py
1
from enum import Enum
3✔
2
from typing import Dict, Optional, Union
3✔
3

4
from slixmpp import JID
3✔
5
from slixmpp.stanza import Presence
3✔
6

7

8
class ContactNotFound(Exception):
3✔
9
    pass
3✔
10

11

3✔
12
class PresenceNotFound(Exception):
3✔
13
    pass
14

15

3✔
16
class PresenceShow(Enum):
3✔
17
    EXTENDED_AWAY = "xa"
3✔
18
    AWAY = "away"
3✔
19
    CHAT = "chat"
3✔
20
    DND = "dnd"
3✔
21
    NONE = "none"
22

23

3✔
24
class PresenceType(Enum):
3✔
25
    AVAILABLE = "available"
3✔
26
    UNAVAILABLE = "unavailable"
3✔
27
    ERROR = "error"
3✔
28
    PROBE = "probe"
3✔
29
    SUBSCRIBE = "subscribe"
3✔
30
    SUBSCRIBED = "subscribed"
3✔
31
    UNSUBSCRIBE = "unsubscribe"
3✔
32
    UNSUBSCRIBED = "unsubscribed"
33

34

3✔
35
class PresenceInfo:
3✔
36
    def __init__(
3✔
37
        self,
3✔
38
        presence_type: PresenceType,
3✔
39
        show: PresenceShow,
3✔
40
        status: Optional[str] = "",
41
        priority: int = 0,
3✔
42
    ):
3✔
43
        self.type = presence_type
44
        self.show = show
3✔
45
        self.status = status
3✔
46
        self.priority = priority
3✔
47

3✔
48
    def is_available(self) -> bool:
49
        return self.type == PresenceType.AVAILABLE
3✔
50

3✔
51
    def __eq__(self, other):
52
        if not isinstance(other, PresenceInfo):
3✔
53
            return False
×
54
        return (
55
            self.type == other.type
3✔
56
            and self.show == other.show
×
57
            and self.status == other.status
58
            and self.priority == other.priority
59
        )
3✔
60

3✔
61
    def __ne__(self, other):
3✔
62
        return not self.__eq__(other)
3✔
63

3✔
64
    def __str__(self):
3✔
65
        return f"PresenceInfo(Type: {self.type}, Show: {self.show}, Status: {self.status}, Priority: {self.priority})"
3✔
66

3✔
67
    def __repr__(self) -> str:
3✔
68
        return str(self)
3✔
69

70

3✔
71
class Contact:
72
    def __init__(self, jid: JID, name: str, subscription: str, ask: str, groups: list):
3✔
73
        self.jid = jid
3✔
74
        self.name = name
3✔
75
        self.subscription = subscription
3✔
76
        self.ask = ask
3✔
77
        self.groups = groups
3✔
78
        self.resources: Dict[str, PresenceInfo] = {}
3✔
79
        self.current_presence: Optional[PresenceInfo] = None
80
        self.last_presence: Optional[PresenceInfo] = None
3✔
81

3✔
82
    def update_presence(self, resource: str, presence_info: PresenceInfo):
×
83
        # Update the current presence automatically based on priority
×
84
        if presence_info == self.resources.get(resource):
85
            return
×
86
        self.resources[resource] = presence_info
3✔
87
        new_presence = max(
3✔
88
            self.resources.values(), key=lambda p: p.priority, default=None
3✔
89
        )
90
        if new_presence != self.current_presence:
3✔
91
            self.last_presence = self.current_presence
3✔
92
            self.current_presence = new_presence
3✔
93

94
    def get_presence(self, resource: Optional[str] = None) -> PresenceInfo:
3✔
95
        if resource:
3✔
96
            if resource in self.resources:
97
                return self.resources[resource]
3✔
98
            else:
×
99
                raise KeyError(
100
                    f"Resource '{resource}' not found for contact {self.jid}."
101
                )
3✔
102
        if self.current_presence:
3✔
103
            return self.current_presence
3✔
104
        raise PresenceNotFound(
3✔
105
            f"No presence information available for contact {self.jid}."
3✔
106
        )
3✔
107

108
    def update_subscription(self, subscription: str, ask: str):
3✔
109
        self.subscription = subscription
3✔
110
        self.ask = ask
3✔
111

3✔
112
    def is_available(self) -> bool:
3✔
113
        return (
3✔
114
            self.current_presence is not None
3✔
115
            and self.current_presence.type == PresenceType.AVAILABLE
3✔
116
        )
117

3✔
118
    def is_subscribed(self) -> bool:
3✔
119
        return self.subscription in ["both", "to"] or self.ask == "subscribe"
120

3✔
121
    def __str__(self):
×
122
        return f"Contact(JID: {self.jid}, Name: {self.name}, Presence: {self.current_presence})"
123

3✔
124
    def __repr__(self) -> str:
3✔
125
        return str(self)
126

3✔
127

3✔
128
class PresenceManager:
129
    def __init__(self, agent, approve_all: bool = False):
3✔
130
        self.contacts: Dict[str, Contact] = {}
3✔
131
        self.agent = agent
132
        self.current_presence: Optional[PresenceInfo] = None
3✔
133
        self.approve_all = approve_all
3✔
134
        # Adding event handlers to handle incoming presence and subscription events
3✔
135
        self.agent.client.add_event_handler("presence_available", self.handle_presence)
3✔
136
        self.agent.client.add_event_handler(
3✔
137
            "presence_unavailable", self.handle_presence
3✔
138
        )
3✔
139
        self.agent.client.add_event_handler("changed_status", self.handle_presence)
3✔
140
        self.agent.client.add_event_handler(
141
            "presence_subscribe", self.handle_subscription
3✔
142
        )
3✔
143
        self.agent.client.add_event_handler(
3✔
144
            "presence_subscribed", self.handle_subscription
3✔
145
        )
3✔
146
        self.agent.client.add_event_handler(
3✔
147
            "presence_unsubscribe", self.handle_subscription
148
        )
3✔
149
        self.agent.client.add_event_handler(
3✔
150
            "presence_unsubscribed", self.handle_subscription
3✔
151
        )
152
        self.agent.client.add_event_handler("roster_update", self.handle_roster_update)
3✔
153

154
    def is_available(self) -> bool:
3✔
155
        return (
156
            self.current_presence is not None and self.current_presence.is_available()
3✔
157
        )
3✔
158

3✔
159
    def get_presence(self) -> PresenceInfo:
3✔
160
        return self.current_presence
161

3✔
162
    def get_show(self) -> PresenceShow:
3✔
163
        return (
3✔
164
            self.current_presence.show if self.current_presence else PresenceShow.NONE
3✔
165
        )
166

3✔
167
    def get_status(self) -> Optional[str]:
168
        return self.current_presence.status if self.current_presence else None
3✔
169

170
    def get_priority(self) -> int:
171
        return self.current_presence.priority if self.current_presence else 0
×
172

173
    def handle_presence(self, presence: Presence):
174
        jid = presence["from"]
3✔
175
        peer_jid = str(jid)
3✔
176
        bare_jid = jid.bare
3✔
177
        if bare_jid == self.agent.jid.bare:
3✔
178
            return
3✔
179
        resource = presence["from"].resource
3✔
180
        presence_type = presence["type"]
3✔
181
        # Normalise the value of `type` if it is a show
3✔
182
        if presence_type in [show.value for show in PresenceShow]:
3✔
183
            presence_type = PresenceType.AVAILABLE
×
184
        presence_type = PresenceType(presence_type)
×
185

186
        show = PresenceShow(presence.get("show", "none"))
3✔
187
        status = presence.get("status")
188
        priority = int(presence.get("priority", 0))
189

3✔
190
        name = presence.name if presence.name else peer_jid
3✔
191
        presence_info = PresenceInfo(presence_type, show, status, priority)
3✔
192
        if bare_jid not in self.contacts:
3✔
193
            # Create a new contact if it doesn't exist
3✔
194
            self.contacts[bare_jid] = Contact(
3✔
195
                jid=JID(peer_jid), name=name, subscription="none", ask="", groups=[]
3✔
196
            )
197
        # Update the presence of the contact
198
        self.contacts[bare_jid].update_presence(resource, presence_info)
3✔
199
        # Call user-defined handler
3✔
200
        if presence_type == PresenceType.AVAILABLE:
201
            self.on_available(
×
202
                peer_jid, presence_info, self.contacts[bare_jid].last_presence
×
203
            )
×
204
        elif presence_type == PresenceType.UNAVAILABLE:
×
205
            self.on_unavailable(
206
                peer_jid, presence_info, self.contacts[bare_jid].last_presence
3✔
207
            )
3✔
208

×
209
        self.on_presence_received(presence)
210

3✔
211
    def handle_subscription(self, presence: Presence):
3✔
212
        peer_jid = presence["from"].bare
3✔
213
        subscription_type = presence["type"]
214
        ask = presence.get("ask", "none")
×
215

216
        if peer_jid not in self.contacts:
3✔
217
            # Create a new contact if it doesn't exist
3✔
218
            self.contacts[peer_jid] = Contact(
3✔
219
                jid=JID(peer_jid),
220
                name=peer_jid,
3✔
221
                subscription="none",
3✔
222
                ask=ask,
3✔
223
                groups=[],
224
            )
3✔
225

226
        # Call user-defined handler or automatically approve if approve_all is True
3✔
227
        if subscription_type == "subscribe" and self.approve_all:
3✔
228
            self.approve_subscription(peer_jid)
229
            self.on_subscribe(peer_jid)
3✔
230
        elif subscription_type == "subscribe":
231
            self.on_subscribe(peer_jid)
3✔
232
        elif subscription_type == "subscribed":
233
            self.subscribed(peer_jid)
3✔
234
            self.on_subscribed(peer_jid)
235
        elif subscription_type == "unsubscribe":
3✔
236
            self.on_unsubscribe(peer_jid)
237
        elif subscription_type == "unsubscribed":
×
238
            self.unsubscribed(peer_jid)
239
            self.on_unsubscribed(peer_jid)
3✔
240

241
    def handle_roster_update(self, event):
3✔
242
        """Executed when the roster is received or updated."""
243

3✔
244
        roster = event["roster"]
245
        for item in roster:
3✔
246
            bare_jid = item.get_jid().bare
3✔
247
            name = item.get("name", bare_jid)
248
            subscription = item.get("subscription", "none")
3✔
249
            ask = item.get("ask", "none")
250
            groups = item.get_groups()
3✔
251

252
            # Storing contact information in the internal structure
3✔
253
            if bare_jid not in self.contacts:
×
254
                self.contacts[bare_jid] = Contact(
255
                    jid=bare_jid,
3✔
256
                    name=name,
257
                    subscription=subscription,
3✔
258
                    ask=ask,
259
                    groups=groups,
3✔
260
                )
3✔
261
            else:
262
                self.contacts[bare_jid].name = name
3✔
263
                self.contacts[bare_jid].subscription = subscription
264
                self.contacts[bare_jid].ask = ask
265
                self.contacts[bare_jid].groups = groups
3✔
266

3✔
267
    def get_contact_presence(
268
        self, jid: Union[str, JID], resource: Optional[str] = None
3✔
269
    ) -> PresenceInfo:
×
270
        if isinstance(jid, JID):
271
            jid = jid.bare
3✔
272
        else:
×
273
            jid = JID(jid).bare
274
        if jid in self.contacts:
3✔
275
            return self.contacts[jid].get_presence(resource)
×
276
        else:
277
            raise ContactNotFound(f"Contact with JID '{jid}' not found.")
3✔
278

3✔
279
    def get_contact(self, jid: Union[str, JID]) -> Contact:
280
        if isinstance(jid, JID):
3✔
281
            jid = jid.bare
3✔
282
        else:
283
            jid = JID(jid).bare
284
        if jid in self.contacts:
285
            return self.contacts[jid]
286
        else:
287
            raise ContactNotFound(f"Contact with JID '{jid}' not found.")
288

289
    def get_contacts(self) -> Dict[str, Contact]:
290
        return {jid: c for jid, c in self.contacts.items() if c.is_subscribed()}
291

292
    def set_presence(
293
        self,
294
        presence_type: PresenceType = PresenceType.AVAILABLE,
295
        show: PresenceShow = PresenceShow.CHAT,
296
        status: Optional[str] = "",
297
        priority: int = 0,
298
    ):
299
        # This method could be used to set the presence for the local user
300
        self.current_presence = PresenceInfo(presence_type, show, status, priority)
301
        # Send the presence stanza to the server
302
        self.agent.client.send_presence(
303
            ptype=presence_type.value,
304
            pshow=show.value,
305
            pstatus=status,
306
            ppriority=str(priority),
307
        )
308

309
    def set_available(self):
310
        # Method to set presence to available
311
        self.set_presence(PresenceType.AVAILABLE)
312

313
    def set_unavailable(self):
314
        # Method to set presence to unavailable
315
        self.set_presence(PresenceType.UNAVAILABLE, PresenceShow.NONE, None, 0)
316

317
    def subscribe(self, jid: str):
318
        # Logic to send a subscription request to a contact
319
        if jid not in self.contacts:
320
            self.contacts[jid] = Contact(
321
                jid=JID(jid), name=jid, subscription="to", ask="subscribe", groups=[]
322
            )
323
        else:
324
            if self.contacts[jid].subscription == "from":
325
                self.contacts[jid].update_subscription("from", "subscribe")
326
            else:
327
                self.contacts[jid].update_subscription("to", "subscribe")
328
        # Send the subscription stanza to the server
329
        self.agent.client.send_presence(pto=jid, ptype="subscribe")
330

331
    def subscribed(self, jid: str):
332
        # Logic to update contact subscription with subscribe
333
        if jid not in self.contacts:
334
            self.contacts[jid] = Contact(
335
                jid=JID(jid), name=jid, subscription="to", ask="", groups=[]
336
            )
337
        else:
338
            if (
339
                self.contacts[jid].subscription == "none"
340
                and self.contacts[jid].ask == "subscribe"
341
            ):
342
                self.contacts[jid].update_subscription("to", "")
343
            elif (
344
                self.contacts[jid].subscription == "from"
345
                and self.contacts[jid].ask == "subscribe"
346
            ):
347
                self.contacts[jid].update_subscription("both", "")
348

349
    def unsubscribe(self, jid: str):
350
        # Logic to send an unsubscription request to a contact
351
        if jid in self.contacts:
352
            if self.contacts[jid].subscription == "both":
353
                self.contacts[jid].update_subscription("from", "")
354
            elif self.contacts[jid].subscription == "to":
355
                self.contacts[jid].update_subscription("none", "")
356
        # Send the unsubscription stanza to the server
357
        self.agent.client.send_presence(pto=jid, ptype="unsubscribe")
358

359
    def unsubscribed(self, jid: str):
360
        # Logic to update contact subscription with unsubscribe
361
        if jid not in self.contacts:
362
            self.contacts[jid] = Contact(
363
                jid=JID(jid), name=jid, subscription="none", ask="", groups=[]
364
            )
365
        else:
366
            if self.contacts[jid].subscription == "both":
367
                self.contacts[jid].update_subscription("to", "")
368
            elif self.contacts[jid].subscription == "from":
369
                self.contacts[jid].update_subscription("none", "")
370

371
    def approve_subscription(self, jid: str):
372
        # Logic to approve a subscription request
373
        if jid in self.contacts:
374
            if self.contacts[jid].subscription == "to":
375
                self.contacts[jid].update_subscription("both", "")
376
            else:
377
                self.contacts[jid].update_subscription("from", "")
378
        # Send the subscribed stanza to the server
379
        self.agent.client.send_presence(pto=jid, ptype="subscribed")
380

381
    # User-overridable methods
382
    def on_subscribe(self, peer_jid: str):
383
        pass
384

385
    def on_subscribed(self, peer_jid: str):
386
        pass
387

388
    def on_unsubscribe(self, peer_jid: str):
389
        pass
390

391
    def on_unsubscribed(self, peer_jid: str):
392
        pass
393

394
    def on_presence_received(self, presence: Presence):
395
        pass
396

397
    def on_available(
398
        self,
399
        peer_jid: str,
400
        presence_info: PresenceInfo,
401
        last_presence: Optional[PresenceInfo],
402
    ):
403
        pass
404

405
    def on_unavailable(
406
        self,
407
        peer_jid: str,
408
        presence_info: PresenceInfo,
409
        last_presence: Optional[PresenceInfo],
410
    ):
411
        pass
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc