• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

ringcentral / ringcentral-python / 139

pending completion
139

push

travis-ci-com

DaKingKong
increase version number

550 of 615 relevant lines covered (89.43%)

1.79 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.13
/ringcentral/subscription/subscription.py
1
#!/usr/bin/env python
2
# encoding: utf-8
3
import json
2✔
4
import base64
2✔
5
from threading import Timer
2✔
6
from observable import Observable
2✔
7
from .events import Events
2✔
8
from ..core import tostr, clean_decrypted
2✔
9

10
RENEW_HANDICAP = 60
2✔
11
stripped = lambda s: ''.join([c for c in s if ord(c) > 31 or ord(c) == 9])
2✔
12

13
class Subscription(Observable):
2✔
14
    def __init__(self, platform):
2✔
15
        Observable.__init__(self)
2✔
16
        self._platform = platform
2✔
17
        self._event_filters = []
2✔
18
        self._timeout = None
2✔
19
        self._subscription = {
2✔
20
            'eventFilters': [],
21
            'expirationTime': '',  # 2014-03-12T19:54:35.613Z
22
            'expiresIn': 0,
23
            'deliveryMode': {
24
                'transportType': 'PubNub',
25
                'encryption': False,
26
                'address': '',
27
                'subscriberKey': '',
28
                'secretKey': ''
29
            },
30
            'id': '',
31
            'creationTime': '',  # 2014-03-12T19:54:35.613Z
32
            'status': '',  # Active
33
            'uri': ''
34
        }
35
        self._pubnub = None
2✔
36

37
    def pubnub(self):
2✔
38
        return self._pubnub
2✔
39

40
    def register(self, events=None):
2✔
41
        if self.alive():
2✔
42
            return self.renew(events=events)
×
43
        else:
44
            return self.subscribe(events=events)
2✔
45

46
    def add_events(self, events):
2✔
47
        self._event_filters += events
2✔
48
        pass
2✔
49

50
    def set_events(self, events):
2✔
51
        self._event_filters = events
2✔
52

53
    def subscribe(self, events=None):
2✔
54

55
        if events:
2✔
56
            self.set_events(events)
2✔
57

58
        if not self._event_filters or len(self._event_filters) == 0:
2✔
59
            raise Exception('Events are undefined')
2✔
60

61
        try:
2✔
62
            response = self._platform.post('/restapi/v1.0/subscription', body={
2✔
63
                'eventFilters': self._get_full_events_filter(),
64
                'deliveryMode': {
65
                    'transportType': 'PubNub'
66
                }
67
            })
68

69
            self.set_subscription(response.json_dict())
2✔
70
            self._subscribe_at_pubnub()
2✔
71
            self.trigger(Events.subscribeSuccess, response)
2✔
72

73
            return response
2✔
74

75
        except Exception as e:
2✔
76
            self.reset()
2✔
77
            self.trigger(Events.subscribeError, e)
2✔
78
            raise
2✔
79

80
    def renew(self, events=None):
2✔
81

82
        if events:
2✔
83
            self.set_events(events)
×
84

85
        if not self.alive():
2✔
86
            raise Exception('Subscription is not alive')
×
87

88
        if not self._event_filters or len(self._event_filters) == 0:
2✔
89
            raise Exception('Events are undefined')
×
90

91
        self._clear_timeout()
2✔
92

93
        try:
2✔
94
            response = self._platform.put('/restapi/v1.0/subscription/' + self._subscription['id'], body={
2✔
95
                'eventFilters': self._get_full_events_filter()
96
            })
97

98
            self.set_subscription(response.json_dict())
2✔
99
            self.trigger(Events.renewSuccess, response)
2✔
100

101
            return response
2✔
102

103
        except Exception as e:
×
104
            self.reset()
×
105
            self.trigger(Events.renewError, e)
×
106
            raise
×
107

108
    def remove(self):
2✔
109
        if not self.alive():
2✔
110
            raise Exception('Subscription is not alive')
×
111

112
        try:
2✔
113
            response = self._platform.delete('/restapi/v1.0/subscription/' + self._subscription['id'])
2✔
114

115
            self.reset()
2✔
116
            self.trigger(Events.removeSuccess, response)
2✔
117

118
            return response
2✔
119

120
        except Exception as e:
2✔
121
            self.reset()
2✔
122
            self.trigger(Events.removeError, e)
2✔
123
            raise
2✔
124

125
    def alive(self):
2✔
126
        s = self._subscription
2✔
127
        return s and \
2✔
128
               ('deliveryMode' in s and s['deliveryMode']) and \
129
               ('subscriberKey' in s['deliveryMode'] and s['deliveryMode']['subscriberKey']) and \
130
               ('address' in s['deliveryMode'] and s['deliveryMode']['address'])
131

132
    def subscription(self):
2✔
133
        return self._subscription
×
134

135
    def set_subscription(self, data):
2✔
136
        self._clear_timeout()
2✔
137
        self._subscription = data
2✔
138
        self._set_timeout()
2✔
139

140
    def reset(self):
2✔
141
        self._clear_timeout()
2✔
142
        self._unsubscribe_at_pubnub()
2✔
143
        self._subscription = None
2✔
144

145
    def destroy(self):
2✔
146
        self.reset()
2✔
147
        self.off()
2✔
148

149
    def _subscribe_at_pubnub(self):
2✔
150
        if not self.alive():
2✔
151
            raise Exception('Subscription is not alive')
×
152

153
        from pubnub.pubnub import PubNub
2✔
154
        from pubnub.pnconfiguration import PNConfiguration
2✔
155
        from pubnub.callbacks import SubscribeCallback
2✔
156
        from pubnub.enums import PNStatusCategory
2✔
157

158
        pnconf = PNConfiguration()
2✔
159
        pnconf.subscribe_key = self._subscription['deliveryMode']['subscriberKey']
2✔
160
        self._pubnub = PubNub(pnconf)
2✔
161

162
        subscription = self
2✔
163

164
        class SubscribeCallbackImpl(SubscribeCallback):
2✔
165
            def presence(self, pubnub, presence):
2✔
166
                pass  # handle incoming presence data
×
167

168
            def status(self, pubnub, status):
2✔
169
                if status.category == PNStatusCategory.PNUnexpectedDisconnectCategory:
2✔
170
                    subscription.trigger(Events.connectionError, 'Connectivity loss')
×
171
                    pass
1✔
172

173
            def message(self, pubnub, pnmessage):  # instance of PNMessageResult
2✔
174
                subscription._notify(pnmessage.message)
×
175

176
        self._pubnub.add_listener(SubscribeCallbackImpl())
2✔
177
        self._pubnub.subscribe().channels(self._subscription['deliveryMode']['address']).execute()
2✔
178

179
    def _notify(self, message):
2✔
180
        message = self._decrypt(message)
2✔
181
        self.trigger(Events.notification, message)
2✔
182

183
    def _decrypt(self, message):
2✔
184
        if not self.alive():
2✔
185
            raise Exception('Subscription is not alive')
×
186

187
        from Crypto.Cipher import AES
2✔
188

189
        delivery_mode = self._subscription['deliveryMode']
2✔
190
        is_encrypted = ('encryption' in delivery_mode) and ('encryptionKey' in delivery_mode)
2✔
191

192
        if is_encrypted:
2✔
193
            key = base64.b64decode(self._subscription['deliveryMode']['encryptionKey'])
2✔
194
            data = base64.b64decode(message)
2✔
195
            cipher = AES.new(key, AES.MODE_ECB)
2✔
196
            decrypted = clean_decrypted(tostr(cipher.decrypt(data)))
2✔
197
            message = stripped(decrypted)
2✔
198
            message = json.loads(decrypted)
2✔
199

200
        return message
2✔
201

202
    def _unsubscribe_at_pubnub(self):
2✔
203
        if self._pubnub and self.alive():
2✔
204
            self._pubnub.unsubscribe().channels(self._subscription['deliveryMode']['address']).execute()
2✔
205

206
    def _get_full_events_filter(self):
2✔
207
        return [self._platform.create_url(e) for e in self._event_filters]
2✔
208

209
    def _set_timeout(self):
2✔
210
        time_to_expiration = self._subscription['expiresIn'] - RENEW_HANDICAP
2✔
211
        self._timeout = Timer(time_to_expiration, self.renew)
2✔
212
        self._timeout.start()
2✔
213

214
    def _clear_timeout(self):
2✔
215
        if self._timeout:
2✔
216
            self._timeout.cancel()
2✔
217

218

219
if __name__ == '__main__':
220
    pass
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc