• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pyiron / pysqa / 5569657923

pending completion
5569657923

Pull #206

github-actions

web-flow
Merge 7cd76d976 into 4e7f0acf5
Pull Request #206: Implement an concurrent.futures.Executor for pysqa

130 of 130 new or added lines in 5 files covered. (100.0%)

839 of 984 relevant lines covered (85.26%)

0.85 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.77
/pysqa/executor/executor.py
1
import os
1✔
2
import queue
1✔
3
from threading import Thread
1✔
4
from concurrent.futures import Future, Executor as FutureExecutor
1✔
5

6
from pympipool import cancel_items_in_queue
1✔
7
from pysqa.executor.helper import (
1✔
8
    reload_previous_futures,
9
    find_executed_tasks,
10
    serialize_funct,
11
    write_to_file,
12
)
13

14

15
class Executor(FutureExecutor):
1✔
16
    def __init__(self, cwd=None, queue_adapter=None, queue_adapter_kwargs=None):
1✔
17
        self._task_queue = queue.Queue()
1✔
18
        self._memory_dict = {}
1✔
19
        self._cache_directory = os.path.abspath(os.path.expanduser(cwd))
1✔
20
        self._queue_adapter = queue_adapter
1✔
21
        reload_previous_futures(
1✔
22
            future_queue=self._task_queue,
23
            future_dict=self._memory_dict,
24
            cache_directory=self._cache_directory,
25
        )
26
        command = (
1✔
27
            "python -m pysqa.executor --cores "
28
            + str(queue_adapter_kwargs["cores"])
29
            + " --path "
30
            + str(self._cache_directory)
31
        )
32
        self._queue_id = self._queue_adapter.submit_job(
1✔
33
            working_directory=self._cache_directory,
34
            command=command,
35
            **queue_adapter_kwargs
36
        )
37
        self._process = Thread(
1✔
38
            target=find_executed_tasks,
39
            kwargs={
40
                "future_queue": self._task_queue,
41
                "cache_directory": self._cache_directory,
42
            },
43
        )
44
        self._process.start()
1✔
45

46
    def submit(self, fn, *args, **kwargs):
1✔
47
        funct_dict = serialize_funct(fn, *args, **kwargs)
1✔
48
        key = list(funct_dict.keys())[0]
1✔
49
        if key not in self._memory_dict.keys():
1✔
50
            self._memory_dict[key] = Future()
1✔
51
            _ = write_to_file(
1✔
52
                funct_dict=funct_dict, state="in", cache_directory=self._cache_directory
53
            )[0]
54
            self._task_queue.put({key: self._memory_dict[key]})
1✔
55
        return self._memory_dict[key]
1✔
56

57
    def shutdown(self, wait=True, *, cancel_futures=False):
1✔
58
        if cancel_futures:
1✔
59
            cancel_items_in_queue(que=self._task_queue)
×
60
        self._task_queue.put({"shutdown": True, "wait": wait})
1✔
61
        self._queue_adapter.delete_job(process_id=self._queue_id)
1✔
62
        self._process.join()
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc