• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

ReactiveX / RxPY / 19213198233

09 Nov 2025 07:14PM UTC coverage: 93.4% (-0.06%) from 93.462%
19213198233

Pull #743

github

dbrattli
Update docs
Pull Request #743: RxPY v5

1861 of 2033 new or added lines in 31 files covered. (91.54%)

153 existing lines in 13 files now uncovered.

25032 of 26801 relevant lines covered (93.4%)

0.93 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.59
/tests/test_core/test_observer.py
1
from reactivex import Observer
1✔
2
from reactivex.notification import OnCompleted, OnError, OnNext, from_notifier
1✔
3

4

5
class MyObserver(Observer):
1✔
6
    def __init__(self):
1✔
7
        super().__init__()
1✔
8
        self.has_on_next = None
1✔
9
        self.has_on_completed = None
1✔
10
        self.has_on_error = None
1✔
11

12
    def _on_next_core(self, value):
1✔
13
        self.has_on_next = value
1✔
14

15
    def _on_error_core(self, error: Exception):
1✔
16
        self.has_on_error = str(error)
1✔
17

18
    def _on_completed_core(self):
1✔
19
        self.has_on_completed = True
1✔
20

21

22
def test_to_observer_notification_on_next():
1✔
23
    i = 0
1✔
24

25
    def next(n):
1✔
26
        assert i == 0
1✔
27
        assert n.kind == "N"
1✔
28
        assert n.value == 42
1✔
29
        assert not hasattr(n, "exception")
1✔
30
        assert n.has_value
1✔
31

32
    from_notifier(next).on_next(42)
1✔
33

34

35
def test_to_observer_notification_on_error():
1✔
36
    ex = "ex"
1✔
37
    i = 0
1✔
38

39
    def next(n):
1✔
40
        assert i == 0
1✔
41
        assert n.kind == "E"
1✔
42
        assert str(n.exception) == ex
1✔
43
        assert not n.has_value
1✔
44

45
    from_notifier(next).on_error(ex)
1✔
46

47

48
def test_to_observer_notification_completed():
1✔
49
    i = 0
1✔
50

51
    def next(n):
1✔
52
        assert i == 0
1✔
53
        assert n.kind == "C"
1✔
54
        assert not n.has_value
1✔
55

56
    from_notifier(next).on_completed()
1✔
57

58

59
def test_to_notifier_forwards():
1✔
60
    obsn = MyObserver()
1✔
61
    obsn.to_notifier()(OnNext(42))
1✔
62
    assert obsn.has_on_next == 42
1✔
63

64
    ex = "ex"
1✔
65
    obse = MyObserver()
1✔
66
    obse.to_notifier()(OnError(ex))
1✔
67
    assert ex == obse.has_on_error
1✔
68

69
    obsc = MyObserver()
1✔
70
    obsc.to_notifier()(OnCompleted())
1✔
71
    assert obsc.has_on_completed
1✔
72

73

74
def test_create_on_next():
1✔
75
    next = [False]
1✔
76

77
    def on_next(x):
1✔
78
        assert 42 == x
1✔
79
        next[0] = True
1✔
80

81
    res = Observer(on_next)
1✔
82

83
    res.on_next(42)
1✔
84
    assert next[0]
1✔
85
    return res.on_completed()
1✔
86

87

88
def test_create_on_next_has_error():
1✔
89
    ex = "ex"
1✔
90
    next = [False]
1✔
91
    _e = None
1✔
92

93
    def on_next(x):
1✔
94
        assert 42 == x
1✔
95
        next[0] = True
1✔
96

97
    res = Observer(on_next)
1✔
98

99
    res.on_next(42)
1✔
100
    assert next[0]
1✔
101

102
    try:
1✔
103
        res.on_error(ex)
1✔
104
        assert False
×
105
    except Exception as e:
1✔
106
        e_ = e.args[0]
1✔
107

108
    assert ex == e_
1✔
109

110

111
def test_create_on_next_on_completed():
1✔
112
    next = [False]
1✔
113
    completed = [False]
1✔
114

115
    def on_next(x):
1✔
116
        assert 42 == x
1✔
117
        next[0] = True
1✔
118
        return next[0]
1✔
119

120
    def on_completed():
1✔
121
        completed[0] = True
1✔
122
        return completed[0]
1✔
123

124
    res = Observer(on_next, None, on_completed)
1✔
125

126
    res.on_next(42)
1✔
127

128
    assert next[0]
1✔
129
    assert not completed[0]
1✔
130

131
    res.on_completed()
1✔
132

133
    assert completed[0]
1✔
134

135

136
def test_create_on_next_close_has_error():
1✔
137
    e_ = None
1✔
138
    ex = "ex"
1✔
139
    next = [False]
1✔
140
    completed = [False]
1✔
141

142
    def on_next(x):
1✔
143
        assert 42 == x
1✔
144
        next[0] = True
1✔
145

146
    def on_completed():
1✔
147
        completed[0] = True
×
148

149
    res = Observer(on_next, None, on_completed)
1✔
150

151
    res.on_next(42)
1✔
152
    assert next[0]
1✔
153
    assert not completed[0]
1✔
154
    try:
1✔
155
        res.on_error(ex)
1✔
156
        assert False
×
157
    except Exception as e:
1✔
158
        e_ = e.args[0]
1✔
159

160
    assert ex == e_
1✔
161
    assert not completed[0]
1✔
162

163

164
def test_create_on_next_on_error():
1✔
165
    ex = "ex"
1✔
166
    next = [True]
1✔
167
    error = [False]
1✔
168

169
    def on_next(x):
1✔
170
        assert 42 == x
1✔
171
        next[0] = True
1✔
172

173
    def on_error(e):
1✔
174
        assert ex == e
1✔
175
        error[0] = True
1✔
176

177
    res = Observer(on_next, on_error)
1✔
178

179
    res.on_next(42)
1✔
180

181
    assert next[0]
1✔
182
    assert not error[0]
1✔
183

184
    res.on_error(ex)
1✔
185
    assert error[0]
1✔
186

187

188
def test_create_on_next_throw_hit_completed():
1✔
189
    ex = "ex"
1✔
190
    next = [True]
1✔
191
    error = [False]
1✔
192

193
    def on_next(x):
1✔
194
        assert 42 == x
1✔
195
        next[0] = True
1✔
196

197
    def on_error(e):
1✔
198
        assert ex == e
×
UNCOV
199
        error[0] = True
×
200

201
    res = Observer(on_next, on_error)
1✔
202

203
    res.on_next(42)
1✔
204
    assert next[0]
1✔
205
    assert not error[0]
1✔
206

207
    res.on_completed()
1✔
208

209
    assert not error[0]
1✔
210

211

212
def test_create_on_next_throw_close1():
1✔
213
    ex = "ex"
1✔
214
    next = [True]
1✔
215
    error = [False]
1✔
216
    completed = [False]
1✔
217

218
    def on_next(x):
1✔
219
        assert 42 == x
1✔
220
        next[0] = True
1✔
221

222
    def on_error(e):
1✔
UNCOV
223
        assert ex == e
×
UNCOV
224
        error[0] = True
×
225

226
    def on_completed():
1✔
227
        completed[0] = True
1✔
228

229
    res = Observer(on_next, on_error, on_completed)
1✔
230

231
    res.on_next(42)
1✔
232

233
    assert next[0]
1✔
234
    assert not error[0]
1✔
235
    assert not completed[0]
1✔
236

237
    res.on_completed()
1✔
238

239
    assert completed[0]
1✔
240
    assert not error[0]
1✔
241

242

243
def test_create_on_next_throw_close2():
1✔
244
    ex = "ex"
1✔
245
    next = [True]
1✔
246
    error = [False]
1✔
247
    completed = [False]
1✔
248

249
    def on_next(x):
1✔
250
        assert 42 == x
1✔
251
        next[0] = True
1✔
252

253
    def on_error(e):
1✔
254
        assert ex == e
1✔
255
        error[0] = True
1✔
256

257
    def on_completed():
1✔
258
        completed[0] = True
×
259

260
    res = Observer(on_next, on_error, on_completed)
1✔
261

262
    res.on_next(42)
1✔
263

264
    assert next[0]
1✔
265
    assert not error[0]
1✔
266
    assert not completed[0]
1✔
267

268
    res.on_error(ex)
1✔
269

270
    assert not completed[0]
1✔
271
    assert error[0]
1✔
272

273

274
def test_as_observer_hides():
1✔
275
    obs = MyObserver()
1✔
276
    res = obs.as_observer()
1✔
277

278
    assert res != obs
1✔
279
    assert not isinstance(res, obs.__class__)
1✔
280

281

282
def test_as_observer_forwards():
1✔
283
    obsn = MyObserver()
1✔
284
    obsn.as_observer().on_next(42)
1✔
285
    assert obsn.has_on_next == 42
1✔
286

287
    ex = "ex"
1✔
288
    obse = MyObserver()
1✔
289
    obse.as_observer().on_error(ex)
1✔
290
    assert obse.has_on_error == ex
1✔
291

292
    obsc = MyObserver()
1✔
293
    obsc.as_observer().on_completed()
1✔
294
    assert obsc.has_on_completed
1✔
295

296

297
if __name__ == "__main__":
1✔
UNCOV
298
    test_to_notifier_forwards()
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc