• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pyiron / pysqa / 9825005101

07 Jul 2024 05:48AM UTC coverage: 83.114%. Remained the same
9825005101

Pull #311

github

web-flow
Merge f205d4b02 into e80dd8103
Pull Request #311: Update conda-incubator/setup-miniconda from v2 to v3

886 of 1066 relevant lines covered (83.11%)

0.83 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.88
/pysqa/executor/executor.py
1
import os
1✔
2
import queue
1✔
3
from concurrent.futures import Executor as FutureExecutor
1✔
4
from concurrent.futures import Future
1✔
5
from typing import Optional
1✔
6

7
from pympipool.shared import RaisingThread, cancel_items_in_queue
1✔
8

9
from pysqa.executor.helper import (
1✔
10
    find_executed_tasks,
11
    reload_previous_futures,
12
    serialize_funct,
13
    write_to_file,
14
)
15

16

17
class Executor(FutureExecutor):
1✔
18
    def __init__(
1✔
19
        self,
20
        cwd: Optional[str] = None,
21
        queue_adapter=None,
22
        queue_adapter_kwargs: Optional[dict] = None,
23
    ):
24
        self._task_queue = queue.Queue()
1✔
25
        self._memory_dict = {}
1✔
26
        self._cache_directory = os.path.abspath(os.path.expanduser(cwd))
1✔
27
        self._queue_adapter = queue_adapter
1✔
28
        reload_previous_futures(
1✔
29
            future_queue=self._task_queue,
30
            future_dict=self._memory_dict,
31
            cache_directory=self._cache_directory,
32
        )
33
        command = (
1✔
34
            "python -m pysqa.executor --cores "
35
            + str(queue_adapter_kwargs["cores"])
36
            + " --path "
37
            + str(self._cache_directory)
38
        )
39
        self._queue_id = self._queue_adapter.submit_job(
1✔
40
            working_directory=self._cache_directory,
41
            command=command,
42
            **queue_adapter_kwargs,
43
        )
44
        self._process = RaisingThread(
1✔
45
            target=find_executed_tasks,
46
            kwargs={
47
                "future_queue": self._task_queue,
48
                "cache_directory": self._cache_directory,
49
            },
50
        )
51
        self._process.start()
1✔
52

53
    def submit(self, fn: callable, *args, **kwargs):
1✔
54
        funct_dict = serialize_funct(fn, *args, **kwargs)
1✔
55
        key = list(funct_dict.keys())[0]
1✔
56
        if key not in self._memory_dict.keys():
1✔
57
            self._memory_dict[key] = Future()
1✔
58
            _ = write_to_file(
1✔
59
                funct_dict=funct_dict, state="in", cache_directory=self._cache_directory
60
            )[0]
61
            self._task_queue.put({key: self._memory_dict[key]})
1✔
62
        return self._memory_dict[key]
1✔
63

64
    def shutdown(self, wait: bool = True, *, cancel_futures: bool = False):
1✔
65
        if cancel_futures:
1✔
66
            cancel_items_in_queue(que=self._task_queue)
×
67
        self._task_queue.put({"shutdown": True, "wait": wait})
1✔
68
        self._queue_adapter.delete_job(process_id=self._queue_id)
1✔
69
        self._process.join()
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc