• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pyiron / pysqa / 5568662821

pending completion
5568662821

Pull #206

github-actions

web-flow
Merge a12e0fd0b into 20deb1de9
Pull Request #206: Implement an concurrent.futures.Executor for pysqa

124 of 124 new or added lines in 5 files covered. (100.0%)

774 of 978 relevant lines covered (79.14%)

0.79 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

32.26
/pysqa/executor/executor.py
1
import os
1✔
2
import queue
1✔
3
from threading import Thread
1✔
4
from concurrent.futures import Future, Executor as FutureExecutor
1✔
5

6
from pympipool import cancel_items_in_queue
1✔
7
from pysqa.executor.helper import (
1✔
8
    reload_previous_futures,
9
    find_executed_tasks,
10
    serialize_funct,
11
    write_to_file,
12
)
13

14

15
class Executor(FutureExecutor):
1✔
16
    def __init__(self, cwd=None, queue_adapter=None, queue_adapter_kwargs=None):
1✔
17
        self._task_queue = queue.Queue()
×
18
        self._memory_dict = {}
×
19
        self._cache_directory = os.path.abspath(os.path.expanduser(cwd))
×
20
        self._queue_adapter = queue_adapter
×
21
        reload_previous_futures(
×
22
            future_queue=self._task_queue,
23
            future_dict=self._memory_dict,
24
            cache_directory=self._cache_directory,
25
        )
26
        command = (
×
27
            "python -m pysqa.executor --cores "
28
            + str(queue_adapter_kwargs["cores"])
29
            + " --path "
30
            + str(self._cache_directory),
31
        )
32
        self._queue_id = self._queue_adapter.submit_job(
×
33
            working_directory=self._cache_directory,
34
            command=command,
35
            **queue_adapter_kwargs
36
        )
37
        self._process = Thread(
×
38
            target=find_executed_tasks,
39
            kwargs={
40
                "future_queue": self._task_queue,
41
                "cache_directory": self._cache_directory,
42
            },
43
        )
44
        self._process.start()
×
45

46
    def submit(self, fn, *args, **kwargs):
1✔
47
        funct_dict = serialize_funct(fn, *args, **kwargs)
×
48
        key = list(funct_dict.keys())[0]
×
49
        if key not in self._memory_dict.keys():
×
50
            self._memory_dict[key] = Future()
×
51
            _ = write_to_file(
×
52
                funct_dict=funct_dict, state="in", cache_directory=self._cache_directory
53
            )[0]
54
            self._task_queue.put({key: self._memory_dict[key]})
×
55
        return self._memory_dict[key]
×
56

57
    def shutdown(self, wait=True, *, cancel_futures=False):
1✔
58
        if cancel_futures:
×
59
            cancel_items_in_queue(que=self._task_queue)
×
60
        self._task_queue.put({"shutdown": True, "wait": wait})
×
61
        self._queue_adapter.delete_job(process_id=self._queue_id)
×
62
        self._process.join()
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc