• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

cokelaer / easydev / 22571376291

02 Mar 2026 10:15AM UTC coverage: 82.318%. Remained the same
22571376291

Pull #42

github

web-flow
Merge e7beb4a13 into 4cce71a21
Pull Request #42: Improve README with installation, feature overview, and quick-start examples

689 of 837 relevant lines covered (82.32%)

1.65 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.83
/easydev/multicore.py
1
# -*- python -*-
2
# -*- coding: utf-8 -*-
3
#
4
#  This file is part of the easydev software
5
#
6
#  Copyright (c) 2011-2024
7
#
8
#  File author(s): Thomas Cokelaer <cokelaer@gmail.com>
9
#
10
#  Distributed under the BSD3 License.
11
#
12
#  Website: https://github.com/cokelaer/easydev
13
#  Documentation: http://easydev-python.readthedocs.io
14
#
15
##############################################################################
16
import time
2✔
17
from multiprocessing import Pool, Process, Queue, cpu_count
2✔
18

19
__all__ = ["MultiProcessing"]
2✔
20

21

22
def _init_worker():
2✔
23
    import signal
×
24

25
    signal.signal(signal.SIGINT, signal.SIG_IGN)
×
26

27

28
class MultiProcessing(object):
2✔
29
    """Class to run jobs in an asynchronous manner.
30

31
    You would use this class to run several jobs on a local computer that has
32
    several cpus.
33

34

35
    ::
36

37
        t = MultiProcessing(maxcpu=2)
38
        t.add_job(func, func_args)
39
        t.run()
40
        t.results[0] # contain returned object from the function *func*.
41

42

43
    .. warning:: the function must be a function, not a method. This is inherent
44
        to multiprocess in the multiprocessing module.
45

46
    .. warning:: the order in the results list may not be the same as the
47
        list of jobs. see :meth:`run` for details
48

49

50
    """
51

52
    def __init__(self, maxcpu=None, verbose=False, progress=True):
2✔
53
        """
54

55
        :param maxcpu: default returned by multiprocessing.cpu_count()
56
        :param verbose: print the output of each job. Could be very verbose
57
            so we advice to keep it False.
58
        :param progress: shows the progress
59

60

61
        """
62
        if maxcpu == None:
2✔
63
            maxcpu = cpu_count()
2✔
64

65
        self.maxcpu = maxcpu
2✔
66
        self.reset()
2✔
67
        self.verbose = verbose
2✔
68
        self.progress = progress
2✔
69

70
    def reset(self):
2✔
71
        """remove joves and results"""
72
        self.jobs = []  # a list of processes
2✔
73
        self.results = Queue()  # the results to append
2✔
74

75
    def add_job(self, func, *args, **kargs):
2✔
76
        """add a job in the pool"""
77
        if self.verbose:
2✔
78
            print(
2✔
79
                "Adding jobs in the queue..",
80
            )
81
        t = Process(target=func, args=args, kwargs=kargs)
2✔
82
        self.jobs.append(t)
2✔
83

84
    def _cb(self, results):
2✔
85
        if self.verbose is True:
2✔
86
            print("callback", results)
2✔
87
        if self.progress is True:
2✔
88
            self.pb.animate(len(self.results) + 1)
2✔
89
        self.results.append(results)
2✔
90

91
    def run(self, delay=0.1, verbose=True):
2✔
92
        """Run all the jobs in the Pool until all have finished.
93

94
        Jobs that have been added to the job list in :meth:`add_job`
95
        are now processed in this method by using a Pool. Here, we add
96
        all jobs using the apply_async method from multiprocess module.
97

98
        In order to ensure that the jobs are run sequentially in the same
99
        order as in :attr:`jobs`, we introduce a delay between 2 calls
100
        to apply_async (see http://docs.python.org/2/library/multiprocessing.html)
101

102
        A better way may be t use a Manager but for now, this works.
103

104
        """
105
        from easydev import Progress
2✔
106

107
        if self.progress is True:
2✔
108
            self.pb = Progress(len(self.jobs), 1)
2✔
109
            self.pb.animate(0)
2✔
110

111
        self.results = []
2✔
112
        self.pool = Pool(self.maxcpu, _init_worker)
2✔
113

114
        for process in self.jobs:
2✔
115
            self.pool.apply_async(process._target, process._args, process._kwargs, callback=self._cb)
2✔
116

117
            # ensure the results have same order as jobs
118
            # maybe important if you expect the order of the results to
119
            # be the same as inut; otherwise set delay to 0
120
            time.sleep(delay)
2✔
121

122
        try:
2✔
123
            while True:
2✔
124
                time.sleep(1)
2✔
125
                # check if all processes are finished.
126
                # if so, finished.
127
                count = len(self.results)
2✔
128
                if count == len(self.jobs):
2✔
129
                    break
2✔
130

131
        except KeyboardInterrupt:  # pragma: no cover
132
            print(
133
                "\nCaught interruption. " + "Terminating the Pool of processes... ",
134
            )
135
            self.pool.terminate()
136
            self.pool.join()
137
            print("... done")
138
        else:
139
            # Closing properly the pool
140
            self.pool.close()
2✔
141
            self.pool.join()
2✔
142

143
        # Pool cannot be pickled. So, if we want to pickel "MultiProcessing"
144
        # class itself, we must desctroy this instance
145
        del self.pool
2✔
146

147
        self.finished = True
2✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc