• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

simonsobs / so_campaign_manager / 16224615683

11 Jul 2025 04:13PM UTC coverage: 48.564% (-2.1%) from 50.63%
16224615683

Pull #46

github

web-flow
Merge be40794fd into 484f19548
Pull Request #46: Gp/feat/mission set

49 of 145 branches covered (33.79%)

Branch coverage included in aggregate %.

78 of 185 new or added lines in 13 files covered. (42.16%)

10 existing lines in 4 files now uncovered.

526 of 1039 relevant lines covered (50.63%)

0.51 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

12.67
/src/socm/enactor/rp_enactor.py
1
# Imports from general packages
2
import os
1✔
3
import threading as mt
1✔
4
from copy import deepcopy
1✔
5
from datetime import datetime
1✔
6
from typing import List
1✔
7

8
# Imports from dependent packages
9
import radical.pilot as rp
1✔
10
import radical.utils as ru
1✔
11

12
from socm.core import Resource, Workflow
1✔
13
from socm.enactor.base import Enactor
1✔
14
from socm.utils import states as st
1✔
15

16

17
class RPEnactor(Enactor):
1✔
18
    """
19
    The Emulated enactor is responsible to execute workflows on emulated
20
    resources. The Enactor takes as input a list of tuples <workflow,resource>
21
    and executes the workflows on their selected resources.
22
    """
23

24
    def __init__(self, sid):
1✔
25
        super(RPEnactor, self).__init__(sid=sid)
×
26
        # List with all the workflows that are executing and require to be
27
        # monitored. This list is atomic and requires a lock
28
        self._to_monitor = list()
×
29

30
        os.environ["RADICAL_CONFIG_USER_DIR"] = os.path.join(os.path.dirname(__file__) + "/../configs/")
×
31
        self._prof.prof("enactor_setup", uid=self._uid)
×
32
        # Lock to provide atomicity in the monitoring data structure
33
        self._monitoring_lock = ru.RLock("cm.monitor_lock")
×
34
        self._cb_lock = ru.RLock("enactor.cb_lock")
×
35
        self._callbacks = dict()
×
36

37
        # Creating a thread to execute the monitoring method.
38
        self._monitoring_thread = None  # Private attribute that will hold the thread
×
39
        self._terminate_monitor = mt.Event()  # Thread event to terminate.
×
40

41
        self._run = False
×
42
        self._prof.prof("enactor_started", uid=self._uid)
×
43
        self._rp_session = rp.Session(uid=sid)
×
44
        self._rp_pmgr = rp.PilotManager(session=self._rp_session)
×
45
        self._rp_tmgr = rp.TaskManager(session=self._rp_session)
×
46
        self._logger.info("Enactor is ready")
×
47

48
    def setup(self, resource: Resource, walltime: int, cores: int) -> None:
1✔
49
        """
50
        Sets up the enactor to execute workflows.
51
        """
52
        pd_init = {
×
53
            "resource": f"so.{resource.name}",
54
            "runtime": walltime,  # pilot runtime (min)
55
            "exit_on_error": True,
56
            "access_schema": "batch",
57
            "cores": cores,
58
        }
59

60
        pdesc = rp.PilotDescription(pd_init)
×
61
        pilot = self._rp_pmgr.submit_pilots(pdesc)
×
62
        self._rp_tmgr.add_pilots(pilot)
×
63

64
        pilot.wait(state=rp.PMGR_ACTIVE)
×
65
        self._logger.info("Pilot is ready")
×
66

67
    def enact(self, workflows: List[Workflow]) -> None:
1✔
68
        """
69
        Method enact receives a set workflows and resources. It is responsible to
70
        start the execution of the workflow and set a endpoint to the WMF that
71
        executes the workflow
72

73
        *workflows:* A workflows that will execute on a resource
74
        *resources:* The resource that will be used.
75
        """
76

77
        self._prof.prof("enacting_start", uid=self._uid)
×
78
        exec_workflows = []
×
NEW
79
        for workflow in workflows:
×
80
            # If the enactor has already received a workflow issue a warning and
81
            # proceed.
82
            if workflow.id in self._execution_status:
×
83
                self._logger.info(
×
84
                    "Workflow %s is in state %s",
85
                    workflow,
86
                    st.state_dict[self._get_workflow_state(workflow.id)],
87
                )
88
                continue
×
89

90
            try:
×
91
                # Create a calculator task. This is equivalent because with
92
                # the emulated resources, a workflow is a number of operations
93
                # that need to be executed.
94

95
                exec_workflow = (
×
96
                    rp.TaskDescription()
97
                )  # Use workflow description and resources to create the TaskDescription
98
                exec_workflow.uid = f"workflow.{workflow.id}"
×
99

100
                exec_workflow.executable = workflow.executable
×
101
                exec_workflow.arguments = []
×
102
                if workflow.subcommand:
×
103
                    exec_workflow.arguments += [workflow.subcommand]
×
NEW
104
                exec_workflow.arguments += workflow.get_arguments()
×
NEW
105
                self._logger.debug("Workflow %s arguments: %s", workflow.id, exec_workflow.arguments)
×
106
                exec_workflow.ranks = workflow.resources["ranks"]
×
107
                exec_workflow.cores_per_rank = workflow.resources["threads"]
×
108
                exec_workflow.threading_type = rp.OpenMP
×
NEW
109
                exec_workflow.mem_per_rank = workflow.resources["memory"]  # this translates to memory per node
×
110
                exec_workflow.post_exec = "echo ${SLURM_JOB_ID}.${SLURM_STEP_ID}"
×
111
                if workflow.environment:
×
112
                    exec_workflow.environment = workflow.environment
×
113
                self._logger.info("Enacting workflow %s", workflow.id)
×
114
                exec_workflows.append(exec_workflow)
×
115
                # Lock the monitoring list and update it, as well as update
116
                # the state of the workflow.
117
                with self._monitoring_lock:
×
118
                    self._to_monitor.append(workflow.id)
×
119
                    self._execution_status[workflow.id] = {
×
120
                        "state": st.EXECUTING,
121
                        "endpoint": exec_workflow,
122
                        "exec_thread": None,
123
                        "start_time": datetime.now(),
124
                        "end_time": None,
125
                    }
126

127
                for cb in self._callbacks:
×
128
                    self._callbacks[cb](
×
129
                        workflow_ids=[workflow.id],
130
                        new_state=st.EXECUTING,
131
                        step_ids=[None],
132
                    )
133
                # Execute the task.
134
            except Exception as ex:
×
135
                self._logger.error(f"Workflow {workflow} could not be executed")
×
136
                self._logger.error(f"Exception raised {ex}", exc_info=True)
×
137

138
        self._rp_tmgr.submit_tasks(exec_workflows)
×
139

140
        self._prof.prof("enacting_stop", uid=self._uid)
×
141
        # If there is no monitoring tasks, start one.
142
        if self._monitoring_thread is None:
×
143
            self._logger.info("Starting monitor thread")
×
144
            self._monitoring_thread = mt.Thread(target=self._monitor, name="monitor-thread")
×
145
            self._monitoring_thread.start()
×
146

147
    def _monitor(self):
1✔
148
        """
149
        **Purpose**: Thread in the master process to monitor the campaign execution
150
                     data structure up to date.
151
        """
152

153
        while not self._terminate_monitor.is_set():
×
154
            if self._to_monitor:
×
155
                workflows_executing = self._rp_tmgr.list_tasks()
×
156
                self._prof.prof("workflow_monitor_start", uid=self._uid)
×
157
                # with self._monitoring_lock:
158
                # It does not iterate correctly.
159
                monitoring_list = deepcopy(self._to_monitor)
×
160
                # self._logger.info("Monitoring workflows %s" % monitoring_list)
161
                to_remove_wfs = list()
×
162
                to_remove_sids = list()
×
163

164
                for workflow_id in monitoring_list:
×
165
                    if f"workflow.{workflow_id}" in workflows_executing:
×
166
                        rp_workflow = self._rp_tmgr.get_tasks(uids=f"workflow.{workflow_id}")
×
167
                        if rp_workflow.state in rp.FINAL:
×
168
                            with self._monitoring_lock:
×
169
                                self._logger.debug(f"workflow.{workflow_id} Done")
×
170
                                self._execution_status[workflow_id]["state"] = st.DONE
×
171
                                self._execution_status[workflow_id]["end_time"] = datetime.now()
×
172
                                self._logger.debug(
×
173
                                    "Workflow %s finished: %s, step_id: %s",
174
                                    workflow_id,
175
                                    self._execution_status[workflow_id]["end_time"],
176
                                    rp_workflow.stdout.split()[-1],
177
                                )
178
                                to_remove_wfs.append(workflow_id)
×
179
                                to_remove_sids.append(rp_workflow.stdout.split()[-1])
×
180
                            self._prof.prof("workflow_success", uid=self._uid)
×
181
                if to_remove_wfs:
×
182
                    for cb in self._callbacks:
×
183
                        self._callbacks[cb](
×
184
                            workflow_ids=to_remove_wfs,
185
                            new_state=st.DONE,
186
                            step_ids=to_remove_sids,
187
                        )
188
                    with self._monitoring_lock:
×
189
                        for wid in to_remove_wfs:
×
190
                            self._to_monitor.remove(wid)
×
191
                self._prof.prof("workflow_monitor_end", uid=self._uid)
×
192

193
    def get_status(self, workflows=None):
1✔
194
        """
195
        Get the state of a workflow or workflows.
196

197
        *Parameter*
198
        *workflows:* A workflow ID or a list of workflow IDs
199

200
        *Returns*
201
        *status*: A dictionary with the state of each workflow.
202
        """
203

204
        status = dict()
×
205
        if workflows is None:
×
206
            for workflow in self._execution_status:
×
207
                status[workflow] = self._execution_status[workflow]["state"]
×
208
        elif isinstance(workflows, list):
×
209
            for workflow in workflows:
×
210
                status[workflow] = self._execution_status[workflow]["state"]
×
211
        else:
212
            status[workflow] = self._execution_status[workflow]["state"]
×
213

214
        return status
×
215

216
    def update_status(self, workflow, new_state):
1✔
217
        """
218
        Update the state of a workflow that is executing
219
        """
220

221
        if workflow not in self._execution_status:
×
222
            self._logger.warning(
×
223
                "Has not enacted on workflow %s yet.",
224
                workflow,
225
                self._get_workflow_state(workflow),
226
            )
227
        else:
228
            self._execution_status[workflow]["state"] = new_state
×
229

230
    def terminate(self):
1✔
231
        """
232
        Public method to terminate the Enactor
233
        """
234
        self._logger.info("Start terminating procedure")
×
235
        self._prof.prof("str_terminating", uid=self._uid)
×
236
        if self._monitoring_thread:
×
237
            self._prof.prof("monitor_terminate", uid=self._uid)
×
238
            self._terminate_monitor.set()
×
239
            self._monitoring_thread.join()
×
240
            self._prof.prof("monitor_terminated", uid=self._uid)
×
241
        self._logger.debug("Monitor thread terminated")
×
242
        # self._rp_tmgr.close()
243
        self._rp_pmgr.close(terminate=True)
×
244
        self._rp_session.close(terminate=True)
×
245
        self._logger.debug("Enactor thread terminated")
×
246

247
    def register_state_cb(self, cb):
1✔
248
        """
249
        Registers a new state update callback function with the Enactor.
250
        """
251

252
        with self._cb_lock:
×
253
            cb_name = cb.__name__
×
254
            self._callbacks[cb_name] = cb
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc