• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

icecube / flarestack / 4536597083

pending completion
4536597083

Pull #268

github

GitHub
Merge 0d13f2846 into e550ab59c
Pull Request #268: Compliance with black v23

26 of 26 new or added lines in 11 files covered. (100.0%)

4405 of 5755 relevant lines covered (76.54%)

2.29 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

70.83
/flarestack/core/multiprocess_wrapper.py
1
import pickle
3✔
2
import logging
3✔
3
import time
3✔
4
from logging.handlers import QueueHandler, QueueListener
3✔
5
import argparse
3✔
6
from flarestack.core.minimisation import MinimisationHandler, read_mh_dict
3✔
7
from multiprocessing import JoinableQueue, Process, Queue, Value
3✔
8
import random
3✔
9
from multiprocessing import set_start_method
3✔
10

11
logger = logging.getLogger(__name__)
3✔
12

13
try:
3✔
14
    set_start_method("fork")
3✔
15
except RuntimeError:
×
16
    pass
×
17

18

19
def generate_dynamic_mh_class(mh_dict):
3✔
20
    # mh_dict = read_mh_dict(mh_dict)
21

22
    try:
×
23
        mh_name = mh_dict["mh_name"]
×
24
    except KeyError:
×
25
        raise KeyError("No MinimisationHandler specified.")
×
26

27
    # Set up dynamic inheritance
28

29
    try:
×
30
        ParentMinimisationHandler = MinimisationHandler.subclasses[mh_name]
×
31
    except KeyError:
×
32
        raise KeyError("Parent class {} not found.".format(mh_name))
×
33

34
    class MultiProcessingMinimisationHandler(ParentMinimisationHandler):
×
35
        def add_injector(self, season, sources):
×
36
            pass
×
37

38
    return MultiProcessingMinimisationHandler(mh_dict)
×
39

40

41
class MultiProcessor:
3✔
42
    queue = None
3✔
43
    results = dict()
3✔
44

45
    def __init__(self, n_cpu, **kwargs):
3✔
46
        self.queue = JoinableQueue()
3✔
47
        self.log_queue = Queue()
3✔
48
        self.n_tasks = Value("i", 0)
3✔
49
        kwargs["n_tasks"] = self.n_tasks
3✔
50

51
        self.processes = [
3✔
52
            Process(target=self.run_trial, kwargs=kwargs) for _ in range(int(n_cpu))
53
        ]
54

55
        self.mh = MinimisationHandler.create(kwargs["mh_dict"])
3✔
56
        for season in self.mh.seasons.keys():
3✔
57
            inj = self.mh.get_injector(season)
3✔
58
            inj.calculate_n_exp()
3✔
59
        self.mh_dict = kwargs["mh_dict"]
3✔
60
        self.scales = []
3✔
61

62
        handler = logging.StreamHandler()
3✔
63
        handler.setFormatter(
3✔
64
            logging.Formatter("%(levelname)s: %(asctime)s - %(process)s - %(message)s")
65
        )
66
        # ql gets records from the queue and sends them to the handler
67

68
        ql = QueueListener(self.log_queue, handler)
3✔
69
        ql.start()
3✔
70

71
        for p in self.processes:
3✔
72
            p.start()
3✔
73

74
    def add_to_queue(self, item):
3✔
75
        self.queue.put(item)
3✔
76

77
    def dump_all_injection_values(self):
3✔
78
        for scale in self.scales:
3✔
79
            self.mh.dump_injection_values(scale)
3✔
80

81
    def run_trial(self, **kwargs):
3✔
82
        qh = QueueHandler(self.log_queue)
×
83
        logger.addHandler(qh)
×
84

85
        mh_dict = kwargs["mh_dict"]
×
86

87
        mpmh = generate_dynamic_mh_class(mh_dict)
×
88

89
        n_tasks = kwargs["n_tasks"]
×
90

91
        while True:
92
            item = self.queue.get()
×
93
            if item is None:
×
94
                break
×
95

96
            (scale, seed) = item
×
97

98
            full_dataset = self.mh.prepare_dataset(scale, seed)
×
99

100
            mpmh.run_single(full_dataset, scale, seed)
×
101
            with n_tasks.get_lock():
×
102
                n_tasks.value -= 1
×
103
            self.queue.task_done()
×
104

105
    def fill_queue(self):
3✔
106
        scale_range, n_trials = self.mh.trial_params(self.mh_dict)
3✔
107

108
        self.scales = scale_range
3✔
109

110
        for scale in scale_range:
3✔
111
            for _ in range(n_trials):
3✔
112
                self.add_to_queue((scale, int(random.random() * 10**8)))
3✔
113

114
        n_tasks = len(scale_range) * n_trials
3✔
115
        with self.n_tasks.get_lock():
3✔
116
            self.n_tasks.value += n_tasks
3✔
117

118
        logger.info("Added {0} trials to queue. Now processing.".format(n_tasks))
3✔
119

120
        while self.n_tasks.value > 0.0:
3✔
121
            logger.info("{0} tasks remaining.".format(self.n_tasks.value))
3✔
122
            time.sleep(30)
3✔
123
        logger.info("Finished processing {0} tasks.".format(n_tasks))
3✔
124

125
    def terminate(self):
3✔
126
        """wait until queue is empty and terminate processes"""
127
        self.queue.join()
3✔
128
        for p in self.processes:
3✔
129
            p.terminate()
3✔
130

131
        self.dump_all_injection_values()
3✔
132

133
    def __enter__(self):
3✔
134
        return self
3✔
135

136
    def __exit__(self, exc_type, exc_val, exc_tb):
3✔
137
        self.terminate()
3✔
138

139

140
def run_multiprocess(n_cpu, mh_dict):
3✔
141
    with MultiProcessor(n_cpu=n_cpu, mh_dict=mh_dict) as r:
3✔
142
        r.fill_queue()
3✔
143
        r.terminate()
3✔
144
        del r
3✔
145

146

147
if __name__ == "__main__":
148
    import os
149

150
    logging.basicConfig(level=logging.INFO)
151

152
    parser = argparse.ArgumentParser()
153
    parser.add_argument("-f", "--file", help="Path for analysis pkl_file")
154
    parser.add_argument("-n", "--n_cpu", default=min(max(1, os.cpu_count() - 1), 32))
155
    cfg = parser.parse_args()
156

157
    logger.info(f"N CPU available {os.cpu_count()}. Using {cfg.n_cpu}")
158

159
    with open(cfg.file, "rb") as f:
160
        mh_dict = pickle.load(f)
161

162
    run_multiprocess(n_cpu=cfg.n_cpu, mh_dict=mh_dict)
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc