• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

jplana / python-etcd / 6702577276

31 Oct 2023 05:45AM UTC coverage: 92.184%. First build
6702577276

Pull #287

github-actions

web-flow
Merge cc7335f8b into 4babc1f36
Pull Request #287: Test modernization

502 of 502 new or added lines in 14 files covered. (100.0%)

1887 of 2047 relevant lines covered (92.18%)

2.76 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.52
/src/etcd/tests/integration/test_simple.py
1
import os
3✔
2
import time
3✔
3
import shutil
3✔
4
import logging
3✔
5
import unittest
3✔
6
import multiprocessing
3✔
7
import tempfile
3✔
8

9
import urllib3
3✔
10

11
import etcd
3✔
12
from . import helpers
3✔
13

14
log = logging.getLogger()
3✔
15

16

17
class EtcdIntegrationTest(unittest.TestCase):
3✔
18
    cl_size = 3
3✔
19

20
    @classmethod
3✔
21
    def setUpClass(cls):
3✔
22
        program = cls._get_exe()
3✔
23
        cls.directory = tempfile.mkdtemp(prefix="python-etcd")
3✔
24
        cls.processHelper = helpers.EtcdProcessHelper(
3✔
25
            cls.directory,
26
            proc_name=program,
27
            port_range_start=6001,
28
            internal_port_range_start=8001,
29
        )
30
        cls.processHelper.run(number=cls.cl_size)
3✔
31
        cls.client = etcd.Client(port=6001)
3✔
32

33
    @classmethod
3✔
34
    def tearDownClass(cls):
3✔
35
        cls.processHelper.stop()
3✔
36
        shutil.rmtree(cls.directory)
3✔
37

38
    @classmethod
3✔
39
    def _is_exe(cls, fpath):
3✔
40
        return os.path.isfile(fpath) and os.access(fpath, os.X_OK)
3✔
41

42
    @classmethod
3✔
43
    def _get_exe(cls):
3✔
44
        PROGRAM = "etcd"
3✔
45

46
        program_path = None
3✔
47

48
        for path in os.environ["PATH"].split(os.pathsep):
3✔
49
            path = path.strip('"')
3✔
50
            exe_file = os.path.join(path, PROGRAM)
3✔
51
            if cls._is_exe(exe_file):
3✔
52
                program_path = exe_file
3✔
53
                break
3✔
54

55
        if not program_path:
3✔
56
            raise Exception("etcd not in path!!")
×
57

58
        return program_path
3✔
59

60

61
class TestSimple(EtcdIntegrationTest):
3✔
62
    def test_machines(self):
3✔
63
        """INTEGRATION: retrieve machines"""
64
        self.assertEquals(self.client.machines[0], "http://127.0.0.1:6001")
3✔
65

66
    def test_leader(self):
3✔
67
        """INTEGRATION: retrieve leader"""
68
        self.assertIn(
3✔
69
            self.client.leader["clientURLs"][0],
70
            ["http://127.0.0.1:6001", "http://127.0.0.1:6002", "http://127.0.0.1:6003"],
71
        )
72

73
    def test_get_set_delete(self):
3✔
74
        """INTEGRATION: set a new value"""
75
        try:
3✔
76
            get_result = self.client.get("/test_set")
3✔
77
            assert False
×
78
        except etcd.EtcdKeyNotFound as e:
3✔
79
            pass
3✔
80

81
        self.assertFalse("/test_set" in self.client)
3✔
82

83
        set_result = self.client.set("/test_set", "test-key")
3✔
84
        self.assertEquals("set", set_result.action.lower())
3✔
85
        self.assertEquals("/test_set", set_result.key)
3✔
86
        self.assertEquals("test-key", set_result.value)
3✔
87

88
        self.assertTrue("/test_set" in self.client)
3✔
89

90
        get_result = self.client.get("/test_set")
3✔
91
        self.assertEquals("get", get_result.action.lower())
3✔
92
        self.assertEquals("/test_set", get_result.key)
3✔
93
        self.assertEquals("test-key", get_result.value)
3✔
94

95
        delete_result = self.client.delete("/test_set")
3✔
96
        self.assertEquals("delete", delete_result.action.lower())
3✔
97
        self.assertEquals("/test_set", delete_result.key)
3✔
98

99
        self.assertFalse("/test_set" in self.client)
3✔
100

101
        try:
3✔
102
            get_result = self.client.get("/test_set")
3✔
103
            assert False
×
104
        except etcd.EtcdKeyNotFound as e:
3✔
105
            pass
3✔
106

107
    def test_update(self):
3✔
108
        """INTEGRATION: update a value"""
109
        self.client.set("/foo", 3)
3✔
110
        c = self.client.get("/foo")
3✔
111
        c.value = int(c.value) + 3
3✔
112
        self.client.update(c)
3✔
113
        newres = self.client.get("/foo")
3✔
114
        self.assertEquals(newres.value, "6")
3✔
115
        self.assertRaises(ValueError, self.client.update, c)
3✔
116

117
    def test_retrieve_subkeys(self):
3✔
118
        """INTEGRATION: retrieve multiple subkeys"""
119
        set_result = self.client.write("/subtree/test_set", "test-key1")
3✔
120
        set_result = self.client.write("/subtree/test_set1", "test-key2")
3✔
121
        set_result = self.client.write("/subtree/test_set2", "test-key3")
3✔
122
        get_result = self.client.read("/subtree", recursive=True)
3✔
123
        result = [subkey.value for subkey in get_result.leaves]
3✔
124
        self.assertEquals(["test-key1", "test-key2", "test-key3"].sort(), result.sort())
3✔
125

126
    def test_directory_ttl_update(self):
3✔
127
        """INTEGRATION: should be able to update a dir TTL"""
128
        self.client.write("/dir", None, dir=True, ttl=30)
3✔
129
        res = self.client.write("/dir", None, dir=True, ttl=31, prevExist=True)
3✔
130
        self.assertEquals(res.ttl, 31)
3✔
131
        res = self.client.get("/dir")
3✔
132
        res.ttl = 120
3✔
133
        new_res = self.client.update(res)
3✔
134
        self.assertEquals(new_res.ttl, 120)
3✔
135

136

137
class TestErrors(EtcdIntegrationTest):
3✔
138
    def test_is_not_a_file(self):
3✔
139
        """INTEGRATION: try to write  value to an existing directory"""
140

141
        self.client.set("/directory/test-key", "test-value")
3✔
142
        self.assertRaises(etcd.EtcdNotFile, self.client.set, "/directory", "test-value")
3✔
143

144
    def test_test_and_set(self):
3✔
145
        """INTEGRATION: try test_and_set operation"""
146

147
        set_result = self.client.set("/test-key", "old-test-value")
3✔
148

149
        set_result = self.client.test_and_set("/test-key", "test-value", "old-test-value")
3✔
150

151
        self.assertRaises(
3✔
152
            ValueError,
153
            self.client.test_and_set,
154
            "/test-key",
155
            "new-value",
156
            "old-test-value",
157
        )
158

159
    def test_creating_already_existing_directory(self):
3✔
160
        """INTEGRATION: creating an already existing directory without
161
        `prevExist=True` should fail"""
162
        self.client.write("/mydir", None, dir=True)
3✔
163

164
        self.assertRaises(etcd.EtcdNotFile, self.client.write, "/mydir", None, dir=True)
3✔
165
        self.assertRaises(
3✔
166
            etcd.EtcdAlreadyExist,
167
            self.client.write,
168
            "/mydir",
169
            None,
170
            dir=True,
171
            prevExist=False,
172
        )
173

174

175
class TestClusterFunctions(EtcdIntegrationTest):
3✔
176
    @classmethod
3✔
177
    def setUpClass(cls):
3✔
178
        program = cls._get_exe()
3✔
179
        cls.directory = tempfile.mkdtemp(prefix="python-etcd")
3✔
180

181
        cls.processHelper = helpers.EtcdProcessHelper(
3✔
182
            cls.directory,
183
            proc_name=program,
184
            port_range_start=6001,
185
            internal_port_range_start=8001,
186
            cluster=True,
187
        )
188

189
    def test_reconnect(self):
3✔
190
        """INTEGRATION: get key after the server we're connected fails."""
191
        self.processHelper.stop()
3✔
192
        self.processHelper.run(number=3)
3✔
193
        self.client = etcd.Client(port=6001, allow_reconnect=True)
3✔
194
        set_result = self.client.set("/test_set", "test-key1")
3✔
195
        get_result = self.client.get("/test_set")
3✔
196

197
        self.assertEquals("test-key1", get_result.value)
3✔
198

199
        self.processHelper.kill_one(0)
3✔
200

201
        get_result = self.client.get("/test_set")
3✔
202
        self.assertEquals("test-key1", get_result.value)
3✔
203

204
    def test_reconnect_with_several_hosts_passed(self):
3✔
205
        """INTEGRATION: receive several hosts at connection setup."""
206
        self.processHelper.stop()
3✔
207
        self.processHelper.run(number=3)
3✔
208
        self.client = etcd.Client(
3✔
209
            host=(("127.0.0.1", 6004), ("127.0.0.1", 6001)), allow_reconnect=True
210
        )
211
        set_result = self.client.set("/test_set", "test-key1")
3✔
212
        get_result = self.client.get("/test_set")
3✔
213

214
        self.assertEquals("test-key1", get_result.value)
3✔
215

216
        self.processHelper.kill_one(0)
3✔
217

218
        get_result = self.client.get("/test_set")
3✔
219
        self.assertEquals("test-key1", get_result.value)
3✔
220

221
    def test_reconnect_not_allowed(self):
3✔
222
        """INTEGRATION: fail on server kill if not allow_reconnect"""
223
        self.processHelper.stop()
3✔
224
        self.processHelper.run(number=3)
3✔
225
        self.client = etcd.Client(port=6001, allow_reconnect=False)
3✔
226
        self.processHelper.kill_one(0)
3✔
227
        self.assertRaises(etcd.EtcdConnectionFailed, self.client.get, "/test_set")
3✔
228

229
    def test_reconnet_fails(self):
3✔
230
        """INTEGRATION: fails to reconnect if no available machines"""
231
        self.processHelper.stop()
3✔
232
        # Start with three instances (0, 1, 2)
233
        self.processHelper.run(number=3)
3✔
234
        # Connect to instance 0
235
        self.client = etcd.Client(port=6001, allow_reconnect=True)
3✔
236
        set_result = self.client.set("/test_set", "test-key1")
3✔
237

238
        get_result = self.client.get("/test_set")
3✔
239
        self.assertEquals("test-key1", get_result.value)
3✔
240
        self.processHelper.kill_one(2)
3✔
241
        self.processHelper.kill_one(1)
3✔
242
        self.processHelper.kill_one(0)
3✔
243
        self.assertRaises(etcd.EtcdException, self.client.get, "/test_set")
3✔
244

245

246
class TestWatch(EtcdIntegrationTest):
3✔
247
    def test_watch(self):
3✔
248
        """INTEGRATION: Receive a watch event from other process"""
249

250
        set_result = self.client.set("/test-key", "test-value")
3✔
251

252
        queue = multiprocessing.Queue()
3✔
253

254
        def change_value(key, newValue):
3✔
255
            c = etcd.Client(port=6001)
×
256
            c.set(key, newValue)
×
257

258
        def watch_value(key, queue):
3✔
259
            c = etcd.Client(port=6001)
×
260
            queue.put(c.watch(key).value)
×
261

262
        changer = multiprocessing.Process(
3✔
263
            target=change_value,
264
            args=(
265
                "/test-key",
266
                "new-test-value",
267
            ),
268
        )
269

270
        watcher = multiprocessing.Process(target=watch_value, args=("/test-key", queue))
3✔
271

272
        watcher.start()
3✔
273
        time.sleep(1)
3✔
274

275
        changer.start()
3✔
276

277
        value = queue.get(timeout=2)
3✔
278
        watcher.join(timeout=5)
3✔
279
        changer.join(timeout=5)
3✔
280

281
        assert value == "new-test-value"
3✔
282

283
    def test_watch_indexed(self):
3✔
284
        """INTEGRATION: Receive a watch event from other process, indexed"""
285

286
        set_result = self.client.set("/test-key", "test-value")
3✔
287
        set_result = self.client.set("/test-key", "test-value0")
3✔
288
        original_index = int(set_result.modifiedIndex)
3✔
289
        set_result = self.client.set("/test-key", "test-value1")
3✔
290
        set_result = self.client.set("/test-key", "test-value2")
3✔
291

292
        queue = multiprocessing.Queue()
3✔
293

294
        def change_value(key, newValue):
3✔
295
            c = etcd.Client(port=6001)
×
296
            c.set(key, newValue)
×
297
            c.get(key)
×
298

299
        def watch_value(key, index, queue):
3✔
300
            c = etcd.Client(port=6001)
×
301
            for i in range(0, 3):
×
302
                queue.put(c.watch(key, index=index + i).value)
×
303

304
        proc = multiprocessing.Process(
3✔
305
            target=change_value,
306
            args=(
307
                "/test-key",
308
                "test-value3",
309
            ),
310
        )
311

312
        watcher = multiprocessing.Process(
3✔
313
            target=watch_value, args=("/test-key", original_index, queue)
314
        )
315

316
        watcher.start()
3✔
317
        time.sleep(0.5)
3✔
318

319
        proc.start()
3✔
320

321
        for i in range(0, 3):
3✔
322
            value = queue.get()
3✔
323
            log.debug("index: %d: %s" % (i, value))
3✔
324
            self.assertEquals("test-value%d" % i, value)
3✔
325

326
        watcher.join(timeout=5)
3✔
327
        proc.join(timeout=5)
3✔
328

329
    def test_watch_generator(self):
3✔
330
        """INTEGRATION: Receive a watch event from other process (gen)"""
331

332
        set_result = self.client.set("/test-key", "test-value")
3✔
333

334
        queue = multiprocessing.Queue()
3✔
335

336
        def change_value(key):
3✔
337
            time.sleep(0.5)
×
338
            c = etcd.Client(port=6001)
×
339
            for i in range(0, 3):
×
340
                c.set(key, "test-value%d" % i)
×
341
                c.get(key)
×
342

343
        def watch_value(key, queue):
3✔
344
            c = etcd.Client(port=6001)
×
345
            for i in range(0, 3):
×
346
                event = next(c.eternal_watch(key)).value
×
347
                queue.put(event)
×
348

349
        changer = multiprocessing.Process(target=change_value, args=("/test-key",))
3✔
350

351
        watcher = multiprocessing.Process(target=watch_value, args=("/test-key", queue))
3✔
352

353
        watcher.start()
3✔
354
        changer.start()
3✔
355

356
        values = ["test-value0", "test-value1", "test-value2"]
3✔
357
        for i in range(0, 1):
3✔
358
            value = queue.get()
3✔
359
            log.debug("index: %d: %s" % (i, value))
3✔
360
            self.assertTrue(value in values)
3✔
361

362
        watcher.join(timeout=5)
3✔
363
        changer.join(timeout=5)
3✔
364

365
    def test_watch_indexed_generator(self):
3✔
366
        """INTEGRATION: Receive a watch event from other process, ixd, (2)"""
367

368
        set_result = self.client.set("/test-key", "test-value")
3✔
369
        set_result = self.client.set("/test-key", "test-value0")
3✔
370
        original_index = int(set_result.modifiedIndex)
3✔
371
        set_result = self.client.set("/test-key", "test-value1")
3✔
372
        set_result = self.client.set("/test-key", "test-value2")
3✔
373

374
        queue = multiprocessing.Queue()
3✔
375

376
        def change_value(key, newValue):
3✔
377
            c = etcd.Client(port=6001)
×
378
            c.set(key, newValue)
×
379

380
        def watch_value(key, index, queue):
3✔
381
            c = etcd.Client(port=6001)
×
382
            iterevents = c.eternal_watch(key, index=index)
×
383
            for i in range(0, 3):
×
384
                queue.put(next(iterevents).value)
×
385

386
        proc = multiprocessing.Process(
3✔
387
            target=change_value,
388
            args=(
389
                "/test-key",
390
                "test-value3",
391
            ),
392
        )
393

394
        watcher = multiprocessing.Process(
3✔
395
            target=watch_value, args=("/test-key", original_index, queue)
396
        )
397

398
        watcher.start()
3✔
399
        time.sleep(0.5)
3✔
400
        proc.start()
3✔
401

402
        for i in range(0, 3):
3✔
403
            value = queue.get()
3✔
404
            log.debug("index: %d: %s" % (i, value))
3✔
405
            self.assertEquals("test-value%d" % i, value)
3✔
406

407
        watcher.join(timeout=5)
3✔
408
        proc.join(timeout=5)
3✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc