• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

google / openhtf / 25279839580

03 May 2026 12:58PM UTC coverage: 63.302% (+0.1%) from 63.203%
25279839580

Pull #1281

github

copybara-github
Add PhaseGraph container to support concurrent execution.
See phase_graph_test.py for usage examples.

PiperOrigin-RevId: 904170357
Pull Request #1281: Add PhaseGraph container to support concurrent execution.

173 of 245 new or added lines in 5 files covered. (70.61%)

3 existing lines in 1 file now uncovered.

4935 of 7796 relevant lines covered (63.3%)

1.9 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.36
/openhtf/core/phase_executor.py
1
# Copyright 2014 Google Inc. All Rights Reserved.
2

3
# Licensed under the Apache License, Version 2.0 (the "License");
4
# you may not use this file except in compliance with the License.
5
# You may obtain a copy of the License at
6

7
#     http://www.apache.org/licenses/LICENSE-2.0
8

9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS,
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
# See the License for the specific language governing permissions and
13
# limitations under the License.
14
"""PhaseExecutor module for handling the phases of a test.
15

16
Each phase is an instance of phase_descriptor.PhaseDescriptor and therefore has
17
relevant options. Each option is taken into account when executing a phase,
18
such as checking options.run_if as soon as possible and timing out at the
19
appropriate time.
20

21
A phase must return an phase_descriptor.PhaseResult, one of CONTINUE, REPEAT, or
22
STOP. A phase may also return None, or have no return statement, which is the
23
same as returning openhtf.PhaseResult.CONTINUE.  These results are then acted
24
upon accordingly and a new test run status is returned.
25

26
Phases are always run in order and not allowed to loop back, though a phase may
27
choose to repeat itself by returning REPEAT. Returning STOP will cause a test
28
to stop early, allowing a test to detect a bad state and not waste any further
29
time. A phase should not return TIMEOUT or ABORT, those are handled by the
30
framework.
31
"""
32

33
import pstats
3✔
34
import sys
3✔
35
import threading
3✔
36
import time
3✔
37
import traceback
3✔
38
import types
3✔
39
from typing import Any, Dict, Optional, Set, TYPE_CHECKING, Text, Tuple, Type, Union
3✔
40

41
import attr
3✔
42
from openhtf import util
3✔
43
from openhtf.core import phase_branches
3✔
44
from openhtf.core import phase_descriptor
3✔
45
from openhtf.core import test_record
3✔
46
from openhtf.util import argv
3✔
47
from openhtf.util import threads
3✔
48
from openhtf.util import timeouts
3✔
49

50
if TYPE_CHECKING:
51
  from openhtf.core import test_state as htf_test_state  # pylint: disable=g-import-not-at-top
52

53
DEFAULT_PHASE_TIMEOUT_S = 3 * 60
3✔
54
_JOIN_TRY_INTERVAL_SECONDS = 3
3✔
55

56
ARG_PARSER = argv.module_parser()
3✔
57
ARG_PARSER.add_argument(
3✔
58
    '--phase_default_timeout_s',
59
    default=DEFAULT_PHASE_TIMEOUT_S,
60
    action=argv.StoreInModule,
61
    target='%s.DEFAULT_PHASE_TIMEOUT_S' % __name__,
62
    help='Test phase timeout in seconds')
63

64

65
@attr.s(slots=True, frozen=True)
3✔
66
class ExceptionInfo(object):
3✔
67
  """Wrap the description of a raised exception and its traceback."""
68

69
  exc_type = attr.ib(type=Type[BaseException])
3✔
70
  exc_val = attr.ib(type=BaseException)
3✔
71
  exc_tb = attr.ib(type=types.TracebackType)
3✔
72

73
  def as_base_types(self) -> Dict[Text, Text]:
3✔
74
    return {
3✔
75
        'exc_type': str(self.exc_type),
76
        'exc_val': str(self.exc_val),
77
        'exc_tb': self.get_traceback_string(),
78
    }
79

80
  def get_traceback_string(self) -> Text:
3✔
81
    return ''.join(
3✔
82
        traceback.format_exception(self.exc_type, self.exc_val, self.exc_tb))
83

84
  def __str__(self) -> Text:
3✔
85
    return self.exc_type.__name__
×
86

87

88
class InvalidPhaseResultError(Exception):
3✔
89
  """Raised when PhaseExecutionOutcome is created with invalid phase result."""
90

91

92
@attr.s(slots=True, frozen=True)
3✔
93
class PhaseExecutionOutcome(object):
3✔
94
  """Provide some utility and sanity around phase return values.
95

96
  This should not be confused with openhtf.PhaseResult.  PhaseResult is an
97
  enumeration to provide user-facing valid phase return values.  This tuple
98
  is used internally to track other possible outcomes (timeout, exception),
99
  and to perform some sanity checking (weird return values from phases).
100

101
  If phase_result is None, that indicates the phase timed out (this makes
102
  sense if you think about it, it timed out, so there was no result).  If
103
  phase_result is an instance of Exception, then that is the Exception that
104
  was raised by the phase.  The raised_exception attribute can be used as
105
  a convenience to test for that condition, and the is_timeout attribute can
106
  similarly be used to check for the timeout case.
107

108
  The only accepted values for phase_result are None (timeout), an instance
109
  of Exception (phase raised), or an instance of openhtf.PhaseResult.
110
  """
111

112
  phase_result = attr.ib(type=Union[None, phase_descriptor.PhaseResult,
3✔
113
                                    ExceptionInfo,
114
                                    threads.ThreadTerminationError])
115

116
  @property
3✔
117
  def is_aborted(self):
3✔
118
    return isinstance(self.phase_result, threads.ThreadTerminationError)
3✔
119

120
  @property
3✔
121
  def is_fail_and_continue(self):
3✔
122
    return self.phase_result is phase_descriptor.PhaseResult.FAIL_AND_CONTINUE
3✔
123

124
  @property
3✔
125
  def is_fail_subtest(self):
3✔
126
    return self.phase_result is phase_descriptor.PhaseResult.FAIL_SUBTEST
3✔
127

128
  @property
3✔
129
  def is_repeat(self):
3✔
130
    return self.phase_result is phase_descriptor.PhaseResult.REPEAT
3✔
131

132
  @property
3✔
133
  def is_skip(self):
3✔
134
    return self.phase_result is phase_descriptor.PhaseResult.SKIP
3✔
135

136
  @property
3✔
137
  def is_terminal(self):
3✔
138
    """True if this result will stop the test."""
139
    return (self.raised_exception or self.is_timeout or
3✔
140
            self.phase_result is phase_descriptor.PhaseResult.STOP)
141

142
  @property
3✔
143
  def is_timeout(self):
3✔
144
    """True if this PhaseExecutionOutcome indicates a phase timeout."""
145
    return self.phase_result is None
3✔
146

147
  @property
3✔
148
  def raised_exception(self):
3✔
149
    """True if the phase in question raised an exception."""
150
    return isinstance(self.phase_result,
3✔
151
                      (ExceptionInfo, threads.ThreadTerminationError))
152

153

154
class PhaseExecutorThread(threads.KillableThread):
3✔
155
  """Handles the execution and result of a single test phase.
156

157
  The phase outcome will be stored in the _phase_execution_outcome attribute
158
  once it is known (_phase_execution_outcome is None until then), and it will be
159
  a PhaseExecutionOutcome instance.
160
  """
161
  daemon = True
3✔
162

163
  def __init__(self, phase_desc: phase_descriptor.PhaseDescriptor,
3✔
164
               test_state: 'htf_test_state.TestState', run_with_profiling: bool,
165
               subtest_rec: Optional[test_record.SubtestRecord]):
166
    super(PhaseExecutorThread, self).__init__(
3✔
167
        name='<PhaseExecutorThread: (phase_desc.name)>',
168
        run_with_profiling=run_with_profiling,
169
        logger=test_state.state_logger.getChild('phase_executor_thread'))
170
    self._phase_desc = phase_desc
3✔
171
    self._test_state = test_state
3✔
172
    self._subtest_rec = subtest_rec
3✔
173
    self._phase_state = test_state.running_phase_state
3✔
174
    self._phase_execution_outcome = None  # type: Optional[PhaseExecutionOutcome]
3✔
175

176
  def _thread_proc(self) -> None:
3✔
177
    """Execute the encompassed phase and save the result."""
178
    self._test_state.running_phase_state = self._phase_state
3✔
179
    # Call the phase, save the return value, or default it to CONTINUE.
180
    phase_return = self._phase_desc(self._test_state)
3✔
181
    if phase_return is None:
3✔
182
      phase_return = phase_descriptor.PhaseResult.CONTINUE
3✔
183

184
    if not isinstance(phase_return, phase_descriptor.PhaseResult):
3✔
185
      raise InvalidPhaseResultError('Invalid phase result', phase_return)
3✔
186
    if (phase_return is phase_descriptor.PhaseResult.FAIL_SUBTEST and
3✔
187
        not self._subtest_rec):
188
      raise InvalidPhaseResultError(
3✔
189
          'Phase returned FAIL_SUBTEST but a subtest is not running.')
190
    self._phase_execution_outcome = PhaseExecutionOutcome(phase_return)
3✔
191

192
  def _log_exception(self, *args: Any) -> Any:
3✔
193
    """Log exception, while allowing unit testing to override."""
194
    self._test_state.state_logger.critical(*args)
3✔
195

196
  def _thread_exception(self, *args) -> bool:
3✔
197
    self._phase_execution_outcome = PhaseExecutionOutcome(ExceptionInfo(*args))
3✔
198
    self._log_exception('Phase %s raised an exception', self._phase_desc.name)
3✔
199
    return True  # Never propagate exceptions upward.
3✔
200

201
  def join_or_die(self) -> PhaseExecutionOutcome:
3✔
202
    """Wait for thread to finish, returning a PhaseExecutionOutcome instance."""
203
    deadline = time.monotonic() + DEFAULT_PHASE_TIMEOUT_S
3✔
204
    if self._phase_desc.options.timeout_s is not None:
3✔
205
      deadline = time.monotonic() + self._phase_desc.options.timeout_s
3✔
206
    while time.monotonic() < deadline:
3✔
207
      # Using exception to kill thread is not honored when thread is busy,
208
      # so we leave the thread behind, and move on teardown.
209
      self.join(_JOIN_TRY_INTERVAL_SECONDS)
3✔
210
      if not self.is_alive() or self._killed.is_set():
3✔
211
        break
3✔
212

213
    # We got a return value or an exception and handled it.
214
    if self._phase_execution_outcome:
3✔
215
      return self._phase_execution_outcome
3✔
216

217
    # Check for timeout, indicated by None for
218
    # PhaseExecutionOutcome.phase_result.
219
    if self.is_alive():
3✔
220
      self.kill()
3✔
221
      return PhaseExecutionOutcome(None)
3✔
222

223
    # Phase was killed.
224
    return PhaseExecutionOutcome(threads.ThreadTerminationError())
3✔
225

226
  @property
3✔
227
  def name(self) -> Text:
3✔
228
    return str(self)
3✔
229

230
  def __str__(self) -> Text:
3✔
231
    return '<{}: ({})>'.format(type(self).__name__, self._phase_desc.name)
3✔
232

233

234
class PhaseExecutor(object):
3✔
235
  """Encompasses the execution of the phases of a test."""
236

237
  def __init__(self, test_state: 'htf_test_state.TestState'):
3✔
238
    self.test_state = test_state
3✔
239
    self.logger = test_state.state_logger.getChild('phase_executor')
3✔
240
    # This lock exists to prevent stop() calls from being ignored if called when
241
    # _execute_phase_once is setting up the next phase thread.
242
    self._current_phase_thread_lock = threading.Lock()
3✔
243
    self._current_phase_thread = None  # type: Optional[PhaseExecutorThread]
3✔
244
    self._active_phase_threads = set()  # type: Set[PhaseExecutorThread]
3✔
245
    self._stopping = threading.Event()
3✔
246

247
  def _should_repeat(self, phase: phase_descriptor.PhaseDescriptor,
3✔
248
                     phase_execution_outcome: PhaseExecutionOutcome, is_last_repeat: bool) -> bool:
249
    """Returns whether a phase should be repeated."""
250
    if phase_execution_outcome.is_timeout and phase.options.repeat_on_timeout:
3✔
251
      return True
×
252
    elif phase_execution_outcome.is_repeat:
3✔
253
      return True
3✔
254
    elif phase.options.force_repeat:
3✔
255
      return True
×
256
    elif phase.options.repeat_on_measurement_fail:
3✔
257
      last_phase_outcome = self.test_state.test_record.phases[-1].outcome
3✔
258
      last_repeat_failed = last_phase_outcome == test_record.PhaseOutcome.FAIL
3✔
259
      if last_repeat_failed and not is_last_repeat:
3✔
260
        self.test_state.test_record.phases[-1].outcome = test_record.PhaseOutcome.SKIP
3✔
261
      return last_repeat_failed
3✔
262
    return False
3✔
263

264
  def execute_phase(
3✔
265
      self,
266
      phase: phase_descriptor.PhaseDescriptor,
267
      run_with_profiling: bool = False,
268
      subtest_rec: Optional[test_record.SubtestRecord] = None
269
  ) -> Tuple[PhaseExecutionOutcome, Optional[pstats.Stats]]:
270
    """Executes a phase or skips it, yielding PhaseExecutionOutcome instances.
271

272
    Args:
273
      phase: Phase to execute.
274
      run_with_profiling: Whether to run with cProfile stat collection for the
275
        phase code run inside a thread.
276
      subtest_rec: Optional subtest record.
277

278
    Returns:
279
      A two-tuple; the first item is the final PhaseExecutionOutcome that wraps
280
      the phase return value (or exception) of the final phase run. All
281
      intermediary results, if any, are REPEAT and handled internally. Returning
282
      REPEAT here means the phase hit its limit for repetitions.
283
      The second tuple item is the profiler Stats object if profiling was
284
      requested and successfully ran for this phase execution.
285
    """
286
    repeat_count = 1
3✔
287
    repeat_limit = (
3✔
288
        phase.options.repeat_limit or phase_descriptor.DEFAULT_REPEAT_LIMIT
289
    )
290
    while not self._stopping.is_set():
3✔
291
      is_last_repeat = repeat_count >= repeat_limit
3✔
292
      phase_execution_outcome, profile_stats = self._execute_phase_once(
3✔
293
          phase, is_last_repeat, run_with_profiling, subtest_rec)
294
      if (self._should_repeat(phase, phase_execution_outcome, is_last_repeat) and
3✔
295
          not is_last_repeat):
296
        repeat_count += 1
3✔
297
        continue
3✔
298

299
      return phase_execution_outcome, profile_stats
3✔
300
    # We've been cancelled, so just 'timeout' the phase.
301
    return PhaseExecutionOutcome(None), None
×
302

303
  def _execute_phase_once(
3✔
304
      self,
305
      phase_desc: phase_descriptor.PhaseDescriptor,
306
      is_last_repeat: bool,
307
      run_with_profiling: bool,
308
      subtest_rec: Optional[test_record.SubtestRecord],
309
  ) -> Tuple[PhaseExecutionOutcome, Optional[pstats.Stats]]:
310
    """Executes the given phase, returning a PhaseExecutionOutcome."""
311
    # Check this before we create a PhaseState and PhaseRecord.
312
    if phase_desc.options.run_if:
3✔
313
      try:
3✔
314
        run_phase = phase_desc.options.run_if()
3✔
315
      except Exception:  # pylint: disable=broad-except
3✔
316
        self.logger.debug('Phase %s stopped due to a fault in run_if function.',
3✔
317
                          phase_desc.name)
318
        # Allow graceful termination
319
        return PhaseExecutionOutcome(ExceptionInfo(*sys.exc_info())), None
3✔
320

321
      if not run_phase:
3✔
322
        self.logger.debug('Phase %s skipped due to run_if returning falsey.',
3✔
323
                          phase_desc.name)
324
        return PhaseExecutionOutcome(phase_descriptor.PhaseResult.SKIP), None
3✔
325

326
    override_result = None
3✔
327
    if id(phase_desc) in getattr(self.test_state, '_concurrent_nodes', set()):
3✔
NEW
328
      ctx_mgr = self.test_state.concurrent_running_phase_context
×
329
    else:
330
      ctx_mgr = self.test_state.running_phase_context
3✔
331
    with ctx_mgr(phase_desc) as phase_state:
3✔
332
      if subtest_rec:
3✔
333
        self.logger.debug('Executing phase %s under subtest %s (from %s)',
3✔
334
                          phase_desc.name, phase_desc.func_location,
335
                          subtest_rec.name)
336
        phase_state.set_subtest_name(subtest_rec.name)
3✔
337
      else:
338
        self.logger.debug('Executing phase %s (from %s)', phase_desc.name,
3✔
339
                          phase_desc.func_location)
340
      with self._current_phase_thread_lock:
3✔
341
        # Checking _stopping must be in the lock context, otherwise there is a
342
        # race condition: this thread checks _stopping and then switches to
343
        # another thread where stop() sets _stopping and checks
344
        # _current_phase_thread (which would not be set yet).  In that case, the
345
        # new phase thread will be still be started.
346
        if self._stopping.is_set():
3✔
347
          # PhaseRecord will be written at this point, so ensure that it has a
348
          # Killed result.
349
          result = PhaseExecutionOutcome(threads.ThreadTerminationError())
×
350
          phase_state.result = result
×
351
          return result, None
×
352
        phase_thread = PhaseExecutorThread(phase_desc, self.test_state,
3✔
353
                                           run_with_profiling, subtest_rec)
354
        phase_thread.start()
3✔
355
        self._current_phase_thread = phase_thread
3✔
356
        self._active_phase_threads.add(phase_thread)
3✔
357

358
      phase_state.result = phase_thread.join_or_die()
3✔
359
      if phase_state.result.is_repeat and is_last_repeat:
3✔
360
        self.logger.error('Phase returned REPEAT, exceeding repeat_limit.')
3✔
361
        phase_state.hit_repeat_limit = True
3✔
362
        override_result = PhaseExecutionOutcome(
3✔
363
            phase_descriptor.PhaseResult.STOP)
364
      with self._current_phase_thread_lock:
3✔
365
        self._active_phase_threads.discard(phase_thread)
3✔
366
        if self._current_phase_thread == phase_thread:
3✔
367
          self._current_phase_thread = None
3✔
368

369
    # Refresh the result in case a validation for a partially set measurement
370
    # or phase diagnoser raised an exception.
371
    result = override_result or phase_state.result
3✔
372
    self.logger.debug('Phase %s finished with result %r', phase_desc.name,
3✔
373
                      result.phase_result)
374
    return (result,
3✔
375
            phase_thread.get_profile_stats() if run_with_profiling else None)
376

377
  def skip_phase(self, phase_desc: phase_descriptor.PhaseDescriptor,
3✔
378
                 subtest_rec: Optional[test_record.SubtestRecord]) -> None:
379
    """Skip a phase, but log a record of it."""
380
    self.logger.debug('Automatically skipping phase %s', phase_desc.name)
3✔
381
    with self.test_state.running_phase_context(phase_desc) as phase_state:
3✔
382
      if subtest_rec:
3✔
383
        phase_state.set_subtest_name(subtest_rec.name)
3✔
384
      phase_state.result = PhaseExecutionOutcome(
3✔
385
          phase_descriptor.PhaseResult.SKIP)
386

387
  def evaluate_checkpoint(
3✔
388
      self, checkpoint: phase_branches.Checkpoint,
389
      subtest_rec: Optional[test_record.SubtestRecord]
390
  ) -> PhaseExecutionOutcome:
391
    """Evaluate a checkpoint, returning a PhaseExecutionOutcome."""
392
    if subtest_rec:
3✔
393
      subtest_name = subtest_rec.name
3✔
394
      self.logger.debug('Evaluating checkpoint %s under subtest %s',
3✔
395
                        checkpoint.name, subtest_name)
396
    else:
397
      self.logger.debug('Evaluating checkpoint %s', checkpoint.name)
3✔
398
      subtest_name = None
3✔
399
    evaluated_millis = util.time_millis()
3✔
400
    try:
3✔
401
      outcome = PhaseExecutionOutcome(checkpoint.get_result(self.test_state,
3✔
402
                                                            subtest_rec))
403
      self.logger.debug('Checkpoint %s result: %s', checkpoint.name,
3✔
404
                        outcome.phase_result)
405
      if outcome.is_fail_subtest and not subtest_rec:
3✔
406
        raise InvalidPhaseResultError(
3✔
407
            'Checkpoint returned FAIL_SUBTEST, but subtest not running.')
408
    except Exception:  # pylint: disable=broad-except
3✔
409
      outcome = PhaseExecutionOutcome(ExceptionInfo(*sys.exc_info()))
3✔
410

411
    checkpoint_rec = test_record.CheckpointRecord.from_checkpoint(
3✔
412
        checkpoint, subtest_name, outcome, evaluated_millis)
413

414
    self.test_state.test_record.add_checkpoint_record(checkpoint_rec)
3✔
415

416
    return outcome
3✔
417

418
  def skip_checkpoint(self, checkpoint: phase_branches.Checkpoint,
3✔
419
                      subtest_rec: Optional[test_record.SubtestRecord]) -> None:
420
    """Skip a checkpoint, but log a record of it."""
421
    self.logger.debug('Automatically skipping checkpoint %s', checkpoint.name)
3✔
422
    subtest_name = subtest_rec.name if subtest_rec else None
3✔
423
    checkpoint_rec = test_record.CheckpointRecord.from_checkpoint(
3✔
424
        checkpoint, subtest_name,
425
        PhaseExecutionOutcome(phase_descriptor.PhaseResult.SKIP),
426
        util.time_millis())
427
    self.test_state.test_record.add_checkpoint_record(checkpoint_rec)
3✔
428

429
  def reset_stop(self) -> None:
3✔
430
    self._stopping.clear()
3✔
431

432
  def stop(
3✔
433
      self,
434
      timeout_s: Union[None, int, float,
435
                       timeouts.PolledTimeout] = None) -> None:
436
    """Stops execution of the current phase, if any.
437

438
    It will raise a ThreadTerminationError, which will cause the test to stop
439
    executing and terminate with an ERROR state.
440

441
    Args:
442
      timeout_s: int or None, timeout in seconds to wait for the phase to stop.
443
    """
444
    self._stopping.set()
3✔
445
    with self._current_phase_thread_lock:
3✔
446
      threads_to_kill = list(self._active_phase_threads)
3✔
447

448
    timeout = timeouts.PolledTimeout.from_seconds(timeout_s)
3✔
449
    for phase_thread in threads_to_kill:
3✔
450
      if phase_thread.is_alive():
3✔
451
        phase_thread.kill()
3✔
452

453
    for phase_thread in threads_to_kill:
3✔
454
      if phase_thread.is_alive():
3✔
455
        self.logger.debug(
3✔
456
            'Waiting for cancelled phase to exit: %s', phase_thread
457
        )
458
        while phase_thread.is_alive() and not timeout.has_expired():
3✔
459
          time.sleep(0.1)
3✔
460
        self.logger.debug(
3✔
461
            'Cancelled phase %s exit',
462
            "didn't" if phase_thread.is_alive() else 'did',
463
        )
464
    # Clear the currently running phase, whether it finished or timed out.
465
    self.test_state.stop_running_phase()
3✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc