• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

ssec / sift / 13528712524

25 Feb 2025 06:50PM UTC coverage: 29.691% (-20.2%) from 49.871%
13528712524

push

github

web-flow
Merge pull request #437 from ameraner/fix_export_image_float

Deactivate export image and rgb config tests to avoid Segfaults in tests and add explicit float casting for Fraction call to fix export tests

0 of 1 new or added line in 1 file covered. (0.0%)

2747 existing lines in 33 files now uncovered.

4386 of 14772 relevant lines covered (29.69%)

0.59 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

31.11
/uwsift/queue.py
1
#!/usr/bin/env python
2
# -*- coding: utf-8 -*-
3
"""
2✔
4
queue.py
5
~~~~~~~~
6

7
PURPOSE
8
Global background task queue for loading, rendering, et cetera.
9
Use TheQueue.add() to create background behavior.
10
Note that Qt4 facilities other than signals should not be used on the task queue!
11

12
REFERENCES
13

14

15
REQUIRES
16

17

18
:author: R.K.Garcia <rayg@ssec.wisc.edu>
19
:copyright: 2014 by University of Wisconsin Regents, see AUTHORS for more details
20
:license: GPLv3, see LICENSE for more details
21
"""
22
__author__ = "rayg"
2✔
23
__docformat__ = "reStructuredText"
2✔
24

25
import logging
2✔
26
from collections import OrderedDict
2✔
27

28
from PyQt5.QtCore import QObject, QThread, pyqtSignal
2✔
29

30
LOG = logging.getLogger(__name__)
2✔
31

32
# keys for status dictionaries
33
TASK_DOING = ("activity", str)
2✔
34
TASK_PROGRESS = ("progress", float)  # 0.0 - 1.0 progress
2✔
35

36
# singleton instance used by clients
37
TheQueue = None
2✔
38

39

40
class Worker(QThread):
2✔
41
    """
42
    Worker thread use by TaskQueue
43
    """
44

45
    # worker id, sequence of dictionaries listing update information to be propagated to view
46
    workerDidMakeProgress = pyqtSignal(int, list)
2✔
47
    # task-key, ok: False if exception occurred else True
48
    workerDidCompleteTask = pyqtSignal(str, bool)
2✔
49

50
    def __init__(self, myid: int):
2✔
UNCOV
51
        super(Worker, self).__init__()
×
UNCOV
52
        self.queue: OrderedDict = OrderedDict()
×
UNCOV
53
        self.depth = 0
×
UNCOV
54
        self.id = myid
×
55

56
    def add(self, key, task_iterable):
2✔
57
        # FUTURE: replace queued task if key matches
58
        self.queue[key] = task_iterable
×
59
        self.depth = len(self.queue)
×
60
        self.start()
×
61

62
    def _did_progress(self, task_status):
2✔
63
        """
64
        Summarize the task queue, including progress, and send it out as a signal
65
        :param task_status:
66
        :return:
67
        """
68
        # FIXME: this should have entries for the upcoming stuff as well so we can have an Activity panel
69
        info = [task_status] if task_status else []
×
70
        self.workerDidMakeProgress.emit(self.id, info)
×
71

72
    def run(self):
2✔
73
        while len(self.queue) > 0:
×
74
            key, task = self.queue.popitem(last=False)
×
75
            # LOG.debug('starting background work on {}'.format(key))
76
            ok = True
×
77
            try:
×
78
                for status in task:
×
79
                    self._did_progress(status)
×
80
            except Exception:
×
81
                # LOG.error("Background task failed")
82
                LOG.error("Background task exception: ", exc_info=True)
×
83
                ok = False
×
84
            self.workerDidCompleteTask.emit(key, ok)
×
85
        self.depth = 0
×
86
        self._did_progress(None)
×
87

88

89
class TaskQueue(QObject):
2✔
90
    """
91
    Global background task queue for loading, rendering, et cetera.
92
    Includes state updates and GUI links.
93
    Eventually will include thread pools and multiprocess pools.
94
    Two threads for interactive tasks (high priority), one thread for background tasks (low priority): 0, 1, 2
95
    """
96

97
    didMakeProgress = pyqtSignal(list)  # sequence of dictionaries listing update information to be propagated to view
2✔
98

99
    # started : inherited
100
    # finished : inherited
101
    # terminated : inherited
102

103
    def __init__(self, process_pool=None):
2✔
UNCOV
104
        super(TaskQueue, self).__init__()
×
UNCOV
105
        self._interactive_round_robin = 0  # swaps between 0 and 1 for interactive tasks
×
UNCOV
106
        self.process_pool = process_pool  # thread pool for background activity
×
UNCOV
107
        self._completion_futures = {}  # dictionary of id(task) : completion(bool)
×
UNCOV
108
        self._last_status = []  # list of last status reports for different workers
×
UNCOV
109
        self.workers = []  # thread pool for background activity
×
UNCOV
110
        for id in range(3):
×
UNCOV
111
            worker = Worker(id)
×
UNCOV
112
            worker.workerDidMakeProgress.connect(self._did_progress)
×
UNCOV
113
            worker.workerDidCompleteTask.connect(self._did_complete_task)
×
UNCOV
114
            self.workers.append(worker)
×
UNCOV
115
            self._last_status.append(None)
×
116

117
        global TheQueue
UNCOV
118
        assert TheQueue is None  # nosec B101
×
UNCOV
119
        TheQueue = self
×
120

121
    @property
2✔
122
    def depth(self):
2✔
123
        return sum([x.depth for x in self.workers])
×
124

125
    @property
2✔
126
    def remaining(self):
2✔
127
        return sum([len(x.queue) for x in self.workers])
×
128

129
    def add(
2✔
130
        self,
131
        key,
132
        task_iterable,
133
        description,
134
        interactive=False,
135
        and_then=None,
136
        use_process_pool=False,
137
        use_thread_pool=False,
138
    ):
139
        """Add an iterable task which will yield progress information dictionaries.
140

141
        Expect behavior like this::
142

143
         for task in queue:
144
            for status_info in task:
145
                update_display(status_info)
146
            pop_display(final_status_info)
147

148
        Args:
149
            key (str): unique key for task. Queuing the same key will result in the old task being removed
150
                and the new one deferred to the end
151
            task_iterable (iter): callable resulting in an iterable, or an iterable itself to be run on the background
152

153
        """
154
        if interactive:
×
155
            wdex = self._interactive_round_robin
×
156
            self._interactive_round_robin += 1
×
157
            self._interactive_round_robin %= 2  # TODO(nk) worker count is hardcoded: worker_count-1
×
158
        else:
159
            wdex = 2
×
160
        if callable(and_then):
×
161
            self._completion_futures[key] = and_then
×
162
        self.workers[wdex].add(key, task_iterable)
×
163

164
    def _did_progress(self, worker_id, worker_status):
2✔
165
        """
166
        Summarize the task queue, including progress, and send it out as a signal
167
        :param worker_status: list of active items, or at least the thing it's working on now
168
        :return:
169
        """
170
        # FIXME: this should consolidate entries for the upcoming stuff as well so we can have an Activity panel
171

172
        self._last_status[worker_id] = worker_status
×
173

174
        # report on the lowest worker number that's active; (0,1 interactive; 2 background)
175
        # yes, this will be redundant
176
        # FUTURE make this a more useful signal content, rather than relying on progress_ratio back-query
177
        for wdex, status in enumerate(self._last_status):
×
178
            if self.workers[wdex].isRunning() and status is not None:
×
179
                self.didMakeProgress.emit(status)
×
180
                return
×
181

182
        # otherwise this is a notification that we're finally at full idle
183
        self.didMakeProgress.emit([{TASK_DOING: "", TASK_PROGRESS: 0.0}])
×
184
        # FUTURE: consider one progress bar per worker
185

186
    def _did_complete_task(self, task_key: str, succeeded: bool):
2✔
187
        # LOG.debug("background task complete!")
188
        todo = self._completion_futures.pop(task_key, None)
×
189
        if callable(todo):
×
190
            LOG.debug("completed task {}, and_then we do this...".format(succeeded))
×
191
            todo(succeeded)
×
192
        # else:
193
        #     LOG.debug("nothing further to do <{}>".format(repr(todo)))
194

195
    def progress_ratio(self, current_progress=None):
2✔
196
        depth = self.depth
×
197
        if depth == 0:
×
198
            return 0.0
×
199
        elif depth == 1 and current_progress is not None:
×
200
            # show something other than 50% if there is only 1 job
201
            return current_progress
×
202
        else:
203
            depth, remaining = self.depth, self.remaining
×
204
            return float(depth - remaining) / depth
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc