• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

simonsobs / ocs / 10634556869

29 Aug 2024 06:23PM UTC coverage: 67.041%. First build
10634556869

push

github

web-flow
Merge pull request #401 from simonsobs/koopman/agent-fixture-timeout-class

Interrupt agent within pytest fixture if it crashes during testing

32 of 45 new or added lines in 1 file covered. (71.11%)

2626 of 3917 relevant lines covered (67.04%)

0.67 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.49
/ocs/testing.py
1
import os
1✔
2
import time
1✔
3
import pytest
1✔
4
import signal
1✔
5
import subprocess
1✔
6
import coverage.data
1✔
7
import urllib.request
1✔
8

9
from threading import Timer
1✔
10
from urllib.error import URLError
1✔
11

12
from ocs.ocs_client import OCSClient
1✔
13

14

15
SIGINT_TIMEOUT = 10
1✔
16

17

18
class _AgentRunner:
1✔
19
    """Class to manage running an agent as a subprocess during testing.
20

21
    Parameters:
22
        agent_path (str): Relative path to Agent,
23
            i.e. '../agents/fake_data/fake_data_agent.py'
24
        agent_name (str): Short, unique name for the agent
25
        args (list): Additional CLI arguments to add when starting the Agent
26

27
    """
28

29
    def __init__(self, agent_path, agent_name, args):
1✔
30
        self.env = os.environ.copy()
1✔
31
        self.env['COVERAGE_FILE'] = f'.coverage.agent.{agent_name}'
1✔
32
        self.env['OCS_CONFIG_DIR'] = os.getcwd()
1✔
33
        self.cmd = [
1✔
34
            'python',
35
            '-u',
36
            '-m',
37
            'coverage',
38
            'run',
39
            '--rcfile=./.coveragerc',
40
            agent_path,
41
            '--site-file',
42
            './default.yaml'
43
        ]
44
        if args is not None:
1✔
45
            self.cmd.extend(args)
1✔
46
        self.agent_name = agent_name
1✔
47
        self.proc = None
1✔
48
        self._timer = None
1✔
49
        self._timedout = False
1✔
50

51
    def run(self, timeout):
1✔
52
        """Run the agent subprocess.
53

54
        This runs the agent subprocess defined by ``self.cmd``. Output is
55
        written to a ``PIPE``. If the agent does not exit within the given
56
        timeout it will be interrupted with a ``SIGKILL``.
57

58
        Parameters:
59
            timeout (float): Timeout in seconds to wait for agent to exit.
60

61
        """
62
        self.proc = subprocess.Popen(self.cmd,
1✔
63
                                     env=self.env,
64
                                     stdout=subprocess.PIPE,
65
                                     stderr=subprocess.PIPE,
66
                                     text=True,
67
                                     preexec_fn=os.setsid)
68

69
        # start timer for if agent crashes and hangs
70
        self._timer = Timer(timeout, self._interrupt)
1✔
71
        self._timer.start()
1✔
72

73
        # Wait briefly then make sure subprocess hasn't already exited.
74
        time.sleep(1)
1✔
75
        if self.proc.poll() is not None:
1✔
NEW
76
            self._timer.cancel()
×
NEW
77
            self._raise_subprocess(f"Agent failed to startup, cmd: {self.cmd}")
×
78

79
    def _interrupt(self):
1✔
80
        # not graceful, but handles really misbehaved agent subprocesses
NEW
81
        self.proc.send_signal(signal.SIGKILL)
×
NEW
82
        self._timedout = True
×
83

84
    def _raise_subprocess(self, msg):
1✔
NEW
85
        stdout, stderr = self.proc.stdout.read(), self.proc.stderr.read()
×
NEW
86
        print(f'Here is stdout from {self.agent_name}:\n{stdout}')
×
NEW
87
        print(f'Here is stderr from {self.agent_name}:\n{stderr}')
×
NEW
88
        raise RuntimeError(msg)
×
89

90
    def shutdown(self):
1✔
91
        """Shutdown the agent process.
92

93
        If the agent does not respond to a ``SIGINT`` then output is printed,
94
        and an exception raised.
95

96
        """
97
        # don't send SIGINT if we've already sent SIGKILL
98
        if not self._timedout:
1✔
99
            self.proc.send_signal(signal.SIGINT)
1✔
100
        self._timer.cancel()
1✔
101

102
        try:
1✔
103
            self.proc.communicate(timeout=SIGINT_TIMEOUT)
1✔
104
        except subprocess.TimeoutExpired:
×
NEW
105
            self._raise_subprocess('Agent did not terminate within '
×
106
                                   f'{SIGINT_TIMEOUT} seconds on SIGINT.')
107

108
        if self._timedout:
1✔
NEW
109
            stdout, stderr = self.proc.communicate(timeout=SIGINT_TIMEOUT)
×
NEW
110
            print(f'Here is stdout from {self.agent_name}:\n{stdout}')
×
NEW
111
            print(f'Here is stderr from {self.agent_name}:\n{stderr}')
×
NEW
112
            raise RuntimeError('Agent timed out.')
×
113

114

115
def create_agent_runner_fixture(agent_path, agent_name, args=None, timeout=60):
1✔
116
    """Create a pytest fixture for running a given OCS Agent.
117

118
    Parameters:
119
        agent_path (str): Relative path to Agent,
120
            i.e. '../agents/fake_data/fake_data_agent.py'
121
        agent_name (str): Short, unique name for the agent
122
        args (list): Additional CLI arguments to add when starting the Agent
123
        timeout (float): Timeout in seconds, after which the agent process will
124
            be interrupted. This typically indicates a crash within the agent.
125
            This timeout should be longer than you expect the agent to run for
126
            during a given test. Defaults to 60 seconds.
127

128
    """
129
    @pytest.fixture()
1✔
130
    def run_agent(cov):
1✔
131
        runner = _AgentRunner(agent_path, agent_name, args)
1✔
132
        runner.run(timeout=timeout)
1✔
133

134
        yield
1✔
135

136
        runner.shutdown()
1✔
137

138
        # report coverage
139
        agentcov = coverage.data.CoverageData(
1✔
140
            basename=f'.coverage.agent.{agent_name}')
141
        agentcov.read()
1✔
142
        # protect against missing --cov flag
143
        if cov is not None:
1✔
144
            cov.get_data().update(agentcov)
1✔
145

146
    return run_agent
1✔
147

148

149
def create_client_fixture(instance_id, timeout=30):
1✔
150
    """Create the fixture that provides tests a Client object.
151

152
    Parameters:
153
        instance_id (str): Agent instance-id to connect the Client to
154
        timeout (int): Approximate timeout in seconds for the connection.
155
            Connection attempts will be made X times, with a 1 second pause
156
            between attempts. This is useful if it takes some time for the
157
            Agent to start accepting connections, which varies depending on the
158
            Agent.
159

160
    """
161
    @pytest.fixture()
1✔
162
    def client_fixture():
1✔
163
        # Set the OCS_CONFIG_DIR so we read the local default.yaml file
164
        os.environ['OCS_CONFIG_DIR'] = os.getcwd()
1✔
165
        print(os.environ['OCS_CONFIG_DIR'])
1✔
166
        attempts = 0
1✔
167

168
        while attempts < timeout:
1✔
169
            try:
1✔
170
                client = OCSClient(instance_id)
1✔
171
                return client
1✔
172
            except RuntimeError as e:
×
173
                print(f"Caught error: {e}")
×
174
                print("Attempting to reconnect.")
×
175

176
            time.sleep(1)
×
177
            attempts += 1
×
178

179
        raise RuntimeError(
×
180
            f"Failed to connect to {instance_id} after {timeout} attempts.")
181

182
    return client_fixture
1✔
183

184

185
def check_crossbar_connection(port=18001, interval=5, max_attempts=6):
1✔
186
    """Check that the crossbar server is up and available for an Agent to
187
    connect to.
188

189
    Parameters:
190
        port (int): Port the crossbar server is configured to run on for
191
            testing.
192
        interval (float): Amount of time in seconds to wait between checks.
193
        max_attempts (int): Maximum number of attempts before giving up.
194

195
    Notes:
196
        For this check to work the crossbar server needs the `Node Info Service
197
        <https://crossbar.io/docs/Node-Info-Service/>`_ running at the path
198
        /info.
199

200
    """
201
    attempts = 0
1✔
202

203
    while attempts < max_attempts:
1✔
204
        try:
1✔
205
            url = f"http://localhost:{port}/info"
1✔
206
            code = urllib.request.urlopen(url).getcode()
1✔
207
        except (URLError, ConnectionResetError):
1✔
208
            print("Crossbar server not online yet, waiting 5 seconds.")
1✔
209
            time.sleep(interval)
1✔
210

211
        attempts += 1
1✔
212

213
    assert code == 200
1✔
214
    print("Crossbar server online.")
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc