• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uw-it-aca / canvas-analytics / 11896297726

18 Nov 2024 04:03PM UTC coverage: 93.756% (-0.05%) from 93.806%
11896297726

Pull #266

github

web-flow
Merge c6333a03c into 509b1685c
Pull Request #266: update default file setting

3799 of 4052 relevant lines covered (93.76%)

0.94 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.75
/data_aggregator/tests/test_threading.py
1
# Copyright 2024 UW-IT, University of Washington
2
# SPDX-License-Identifier: Apache-2.0
3

4

5
import queue
1✔
6
import unittest
1✔
7
from django.test import TestCase
1✔
8
from multiprocessing import Queue
1✔
9
from data_aggregator.threads import ThreadPool, PersistentThread
1✔
10
from mock import MagicMock
1✔
11

12

13
class TestThreadPool(TestCase):
1✔
14

15
    test_queue = Queue()
1✔
16

17
    def _get_processed_values(self, num_processed_values):
1✔
18
        processed_values = []
1✔
19
        while len(processed_values) < num_processed_values:
1✔
20
            try:
1✔
21
                processed_values.append(
1✔
22
                    TestThreadPool.test_queue.get_nowait()
23
                )
24
            except queue.Empty:
×
25
                continue
×
26
        return processed_values
1✔
27

28
    def run_job(self, value):
1✔
29
        value += 1
1✔
30
        TestThreadPool.test_queue.put_nowait(value)
1✔
31

32
    def test_map(self):
1✔
33
        # as context manager
34
        with ThreadPool(processes=2) as pool:
1✔
35
            pool.map(self.run_job, [1, 2, 3, 4, 5])
1✔
36
            self.assertEqual(len(pool.threads), 2)
1✔
37
        # read all processed values
38
        processed_values = self._get_processed_values(5)
1✔
39
        # assert that values were processed
40
        self.assertEqual([2, 3, 4, 5, 6],
1✔
41
                         sorted(processed_values))
42

43
        # as normal class instance
44
        pool = ThreadPool(processes=3)
1✔
45
        self.assertEqual(len(pool.threads), 3)
1✔
46
        pool.map(self.run_job, [3, 2, 4, 2, 1])
1✔
47
        # read all processed values
48
        processed_values = self._get_processed_values(5)
1✔
49
        # assert that values were processed
50
        self.assertEqual([2, 3, 3, 4, 5],
1✔
51
                         sorted(processed_values))
52

53
        # test where there are less values than the thread pool size
54
        pool = ThreadPool(processes=20)
1✔
55
        self.assertEqual(len(pool.threads), 20)
1✔
56
        pool.map(self.run_job, [9, 4, 3, 1, 1])
1✔
57
        # read all processed values
58
        processed_values = self._get_processed_values(5)
1✔
59
        # assert that values were processed
60
        self.assertEqual([2, 2, 4, 5, 10],
1✔
61
                         sorted(processed_values))
62

63
        # test where job raises an uncaught exception
64
        bad_job = MagicMock()
1✔
65
        pool = ThreadPool(processes=2)
1✔
66
        self.assertEqual(pool.processes, 2)
1✔
67
        self.assertEqual(len(pool.threads), 2)
1✔
68
        pool.get_dead_threads = MagicMock()
1✔
69
        t1 = PersistentThread()
1✔
70
        self.assertEqual(t1.is_alive(), False)
1✔
71
        t2 = PersistentThread()
1✔
72
        self.assertEqual(t2.is_alive(), False)
1✔
73
        pool.get_dead_threads.return_value = [t1, t2]
1✔
74
        pool.map(bad_job, [9, 4, 3, 1, 1])
1✔
75

76

77
if __name__ == "__main__":
1✔
78
    unittest.main()
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc