• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

google / openhtf / 25279839580

03 May 2026 12:58PM UTC coverage: 63.302% (+0.1%) from 63.203%
25279839580

Pull #1281

github

copybara-github
Add PhaseGraph container to support concurrent execution.
See phase_graph_test.py for usage examples.

PiperOrigin-RevId: 904170357
Pull Request #1281: Add PhaseGraph container to support concurrent execution.

173 of 245 new or added lines in 5 files covered. (70.61%)

3 existing lines in 1 file now uncovered.

4935 of 7796 relevant lines covered (63.3%)

1.9 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.71
/openhtf/core/test_executor.py
1
# Copyright 2014 Google Inc. All Rights Reserved.
2

3
# Licensed under the Apache License, Version 2.0 (the "License");
4
# you may not use this file except in compliance with the License.
5
# You may obtain a copy of the License at
6

7
#     http://www.apache.org/licenses/LICENSE-2.0
8

9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS,
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
# See the License for the specific language governing permissions and
13
# limitations under the License.
14
"""TestExecutor executes tests."""
15

16
import concurrent.futures
3✔
17
import contextlib
3✔
18
import enum
3✔
19
import logging
3✔
20
import multiprocessing
3✔
21
import pstats
3✔
22
import sys
3✔
23
import tempfile
3✔
24
import threading
3✔
25
import traceback
3✔
26
from typing import Iterator, List, Optional, TYPE_CHECKING, Text, Type
3✔
27

28
from openhtf import util
3✔
29
from openhtf.core import base_plugs
3✔
30
from openhtf.core import diagnoses_lib
3✔
31
from openhtf.core import phase_branches
3✔
32
from openhtf.core import phase_collections
3✔
33
from openhtf.core import phase_descriptor
3✔
34
from openhtf.core import phase_executor
3✔
35
from openhtf.core import phase_graph
3✔
36
from openhtf.core import phase_group
3✔
37
from openhtf.core import phase_nodes
3✔
38
from openhtf.core import test_record
3✔
39
from openhtf.core import test_state
3✔
40
from openhtf.util import configuration
3✔
41
from openhtf.util import threads
3✔
42

43
CONF = configuration.CONF
3✔
44

45
if TYPE_CHECKING:
46
  from openhtf.core import test_descriptor  # pylint: disable=g-import-not-at-top
47

48
_LOG = logging.getLogger(__name__)
3✔
49

50
CONF.declare(
3✔
51
    'cancel_timeout_s',
52
    default_value=2,
53
    description='Timeout (in seconds) when the test has been cancelled'
54
    'to wait for the running phase to exit.')
55

56
CONF.declare(
3✔
57
    'stop_on_first_failure',
58
    default_value=False,
59
    description='Stop current test execution and return Outcome FAIL'
60
    'on first phase with failed measurement.')
61

62

63
class TestExecutionError(Exception):
3✔
64
  """Raised when there's an internal error during test execution."""
65

66

67
class TestStopError(Exception):
3✔
68
  """Test is being stopped."""
69

70

71
class _ExecutorReturn(enum.Enum):
3✔
72
  CONTINUE = 0
3✔
73
  TERMINAL = 1
3✔
74

75

76
def _more_critical(e1: _ExecutorReturn, e2: _ExecutorReturn) -> _ExecutorReturn:
3✔
77
  return _ExecutorReturn(max(e1.value, e2.value))
3✔
78

79

80
def combine_profile_stats(profile_stats_iter: List[pstats.Stats],
3✔
81
                          output_filename: Text) -> None:
82
  """Given an iterable of pstats.Stats, combine them into a single Stats."""
83
  profile_stats_filenames = []
3✔
84
  for profile_stats in profile_stats_iter:
3✔
85
    with tempfile.NamedTemporaryFile(delete=False) as f:
3✔
86
      profile_stats_filename = f.name
3✔
87
    profile_stats.dump_stats(profile_stats_filename)
3✔
88
    profile_stats_filenames.append(profile_stats_filename)
3✔
89
  if profile_stats_filenames:
3✔
90
    pstats.Stats(*profile_stats_filenames).dump_stats(output_filename)
3✔
91

92

93
# pylint: disable=too-many-instance-attributes
94
class TestExecutor(threads.KillableThread):
3✔
95
  """Encompasses the execution of a single test."""
96
  daemon = True
3✔
97

98
  def __init__(self, test_descriptor: 'test_descriptor.TestDescriptor',
3✔
99
               execution_uid: Text,
100
               test_start: Optional[phase_descriptor.PhaseDescriptor],
101
               test_options: 'test_descriptor.TestOptions',
102
               run_phases_with_profiling: bool):
103
    super(TestExecutor, self).__init__(name='TestExecutorThread')
3✔
104
    self.test_state = None  # type: Optional[test_state.TestState]
3✔
105
    self._run_phases_with_profiling = run_phases_with_profiling
3✔
106
    self._test_descriptor = test_descriptor
3✔
107
    self._test_start = test_start
3✔
108
    self._test_options = test_options
3✔
109
    self._lock = threading.Lock()
3✔
110
    self._phase_exec = None  # type: Optional[phase_executor.PhaseExecutor]
3✔
111
    self.uid = execution_uid
3✔
112
    self._last_outcome = None  # type: Optional[phase_executor.PhaseExecutionOutcome]
3✔
113
    self._last_execution_unit: str = None
3✔
114
    self._abort = threading.Event()
3✔
115
    self._full_abort = threading.Event()
3✔
116
    self._execution_finished = threading.Event()
3✔
117
    # This is a reentrant lock so that the teardown logic that prevents aborts
118
    # affects nested sequences.
119
    self._teardown_phases_lock = threading.RLock()
3✔
120
    # Populated if profiling is enabled.
121
    self._phase_profile_stats = []  # type: List[pstats.Stats]
3✔
122

123
  @property
3✔
124
  def running_test_state(self) -> test_state.TestState:
3✔
125
    if self.test_state is None:
3✔
126
      raise TestStopError('Test stopped.')
×
127
    return self.test_state
3✔
128

129
  @property
3✔
130
  def phase_executor(self) -> phase_executor.PhaseExecutor:
3✔
131
    if self._phase_exec is None:
3✔
132
      raise TestStopError('Test stopped.')
×
133
    return self._phase_exec
3✔
134

135
  @property
3✔
136
  def logger(self) -> logging.Logger:
3✔
137
    return self.running_test_state.state_logger
3✔
138

139
  @property
3✔
140
  def phase_profile_stats(self) -> List[pstats.Stats]:
3✔
141
    """Returns iterable of profiling Stats objects, per phase."""
142
    return self._phase_profile_stats
3✔
143

144
  def close(self) -> None:
3✔
145
    """Close and remove any global registrations.
146

147
    Always call this function when finished with this instance.
148

149
    This function is defined instead of a __del__ function because Python calls
150
    the __del__ function unreliably.
151
    """
152
    self.wait()
3✔
153
    self.running_test_state.close()
3✔
154

155
  def abort(self) -> None:
3✔
156
    """Abort this test."""
157
    if self._abort.is_set():
3✔
158
      _LOG.error('Abort already set; forcibly stopping the process.')
3✔
159
      self._full_abort.set()
3✔
160
      self._stop_phase_executor(force=True)
3✔
161
      return
3✔
162
    _LOG.error('Abort test executor.')
3✔
163
    # Deterministically mark the test as aborted.
164
    self._abort.set()
3✔
165
    self._stop_phase_executor()
3✔
166
    # No need to kill this thread because the abort state has been set, it will
167
    # end as soon as all queued teardown phases are run.
168

169
  def finalize(self) -> test_state.TestState:
3✔
170
    """Finalize test execution and output resulting record to callbacks.
171

172
    Should only be called once at the conclusion of a test run.
173

174
    Returns:
175
      Finalized TestState.  It must not be modified after this call.
176

177
    Raises:
178
      TestStopError: If the test is already stopped or never ran.
179
    """
180
    if not self.test_state:
3✔
181
      raise TestStopError('Test Stopped.')
×
182
    if self.test_state.test_record.dut_id is None:
3✔
183
      _LOG.warning('DUT ID is still not set; using default.')
3✔
184
      self.test_state.test_record.dut_id = self._test_options.default_dut_id
3✔
185

186
    return self.test_state
3✔
187

188
  def wait(self) -> None:
3✔
189
    """Waits until death."""
190
    # Must use a timeout here in case this is called from the main thread.
191
    # Otherwise, the SIGINT abort logic in test_descriptor will not get called.
192
    # TIMEOUT_MAX can be too large and cause overflows on 32-bit OSes, so take
193
    # whichever timeout is shorter.
194
    timeout = min(
3✔
195
        threading.TIMEOUT_MAX,
196
        31557600,  # Seconds in a year.
197
    )
198
    # This function is expected to be called twice in the case of a SIGINT,
199
    # and in Python 3.12 the second call would always return immediately,
200
    # preventing a clean exit (see `execute` in test_descriptor.py). Instead,
201
    # we wait on an Event that we control.
202
    self._execution_finished.wait(timeout)
3✔
203
    self.join()
3✔
204

205
  def _thread_proc(self) -> None:
3✔
206
    """Handles one whole test from start to finish."""
207
    self._execution_finished.clear()
3✔
208
    try:
3✔
209
      # Top level steps required to run a single iteration of the Test.
210
      self.test_state = test_state.TestState(self._test_descriptor, self.uid,
3✔
211
                                             self._test_options)
212
      phase_exec = phase_executor.PhaseExecutor(self.test_state)
3✔
213

214
      # Any access to self._exit_stacks must be done while holding this lock.
215
      with self._lock:
3✔
216
        self._phase_exec = phase_exec
3✔
217

218
      if self._test_start is not None and self._execute_test_start():
3✔
219
        # Exit early if test_start returned a terminal outcome of any kind.
220
        return
3✔
221
      self.test_state.mark_test_started()
3✔
222

223
      # Full plug initialization happens _after_ the start trigger, as close to
224
      # test execution as possible, for the best chance of test equipment being
225
      # in a known-good state at the start of test execution.
226
      if self._initialize_plugs():
3✔
227
        return
3✔
228

229
      # Everything is set, set status and begin test execution.
230
      self.test_state.set_status_running()
3✔
231
      self._execute_node(self._test_descriptor.phase_sequence, None, False)
3✔
232
      self._execute_test_diagnosers()
3✔
233
    except:  # pylint: disable=bare-except
3✔
234
      stacktrace = traceback.format_exc()
3✔
235
      _LOG.error('Error in TestExecutor: \n%s', stacktrace)
3✔
236
      raise
3✔
237
    finally:
238
      self._execute_test_teardown()
3✔
239
      self._execution_finished.set()
3✔
240

241
  def _initialize_plugs(
3✔
242
      self,
243
      plug_types: Optional[List[Type[base_plugs.BasePlug]]] = None) -> bool:
244
    """Initialize plugs.
245

246
    Args:
247
      plug_types: optional list of plug classes to initialize.
248

249
    Returns:
250
      True if there was an error initializing the plugs.
251
    """
252
    try:
3✔
253
      self.running_test_state.plug_manager.initialize_plugs(
3✔
254
          plug_types=plug_types
255
      )
256
      return False
3✔
257
    except Exception:  # pylint: disable=broad-except
3✔
258
      # Record the equivalent failure outcome and exit early.
259
      self._last_outcome = phase_executor.PhaseExecutionOutcome(
3✔
260
          phase_executor.ExceptionInfo(*sys.exc_info()))
261
      self._last_execution_unit = 'Plugs Initialization'
3✔
262
      return True
3✔
263

264
  def _execute_test_start(self) -> bool:
3✔
265
    """Run the start trigger phase, and check that the DUT ID is set after.
266

267
    Initializes any plugs used in the trigger.
268
    Logs a warning if the start trigger failed to set the DUT ID.
269

270
    The test start is special because we wait to initialize all other plugs
271
    until this phase runs.
272

273
    Returns:
274
      True if there was a terminal error either setting up or running the test
275
      start phase.
276
    """
277
    if self._test_start is None:
3✔
278
      raise TestStopError('Test stopped.')
×
279

280
    # Have the phase executor run the start trigger phase. Do partial plug
281
    # initialization for just the plugs needed by the start trigger phase.
282
    if self._initialize_plugs(
3✔
283
        plug_types=[phase_plug.cls for phase_plug in self._test_start.plugs]):
284
      return True
3✔
285

286
    outcome, profile_stats = self.phase_executor.execute_phase(
3✔
287
        self._test_start, self._run_phases_with_profiling
288
    )
289

290
    if profile_stats is not None:
3✔
291
      self._phase_profile_stats.append(profile_stats)
3✔
292

293
    if outcome.is_terminal:
3✔
UNCOV
294
      self._last_outcome = outcome
×
UNCOV
295
      self._last_execution_unit = 'TestStart'
×
UNCOV
296
      return True
×
297

298
    if self.running_test_state.test_record.dut_id is None:
3✔
299
      _LOG.warning('Start trigger did not set a DUT ID.')
×
300
    return False
3✔
301

302
  def _stop_phase_executor(self, force: bool = False) -> None:
3✔
303
    with self._lock:
3✔
304
      phase_exec = self._phase_exec
3✔
305
      if not phase_exec:
3✔
306
        # The test executor has not started yet, so no stopping is required.
307
        return
3✔
308
    if not force and not self._teardown_phases_lock.acquire(False):
3✔
309
      # If locked, teardown phases are running, so do not cancel those.
310
      return
3✔
311
    try:
3✔
312
      phase_exec.stop(timeout_s=CONF.cancel_timeout_s)
3✔
313
      # Resetting so phase_exec can run teardown phases.
314
      phase_exec.reset_stop()
3✔
315
    finally:
316
      if not force:
3✔
317
        self._teardown_phases_lock.release()
3✔
318

319
  def _execute_test_teardown(self) -> None:
3✔
320
    # Plug teardown does not affect the test outcome.
321
    self.running_test_state.plug_manager.tear_down_plugs()
3✔
322

323
    # Now finalize the test state.
324
    if self._abort.is_set():
3✔
325
      self.logger.debug('Finishing test with outcome ABORTED.')
3✔
326
      self.running_test_state.abort()
3✔
327
    elif self._last_outcome and self._last_outcome.is_terminal:
3✔
328
      self.running_test_state.finalize_from_phase_outcome(
3✔
329
          self._last_outcome, self._last_execution_unit
330
      )
331
    else:
332
      self.running_test_state.finalize_normally()
3✔
333

334
  def _execute_phase(self, phase: phase_descriptor.PhaseDescriptor,
3✔
335
                     subtest_rec: Optional[test_record.SubtestRecord],
336
                     in_teardown: bool) -> _ExecutorReturn:
337
    if subtest_rec:
3✔
338
      self.logger.debug('Executing phase %s (from %s) under subtest %s',
3✔
339
                        phase.name, phase.func_location, subtest_rec.name)
340
    else:
341
      self.logger.debug('Executing phase %s (from %s)', phase.name,
3✔
342
                        phase.func_location)
343

344
    if not in_teardown and subtest_rec and subtest_rec.is_fail:
3✔
345
      self.phase_executor.skip_phase(phase, subtest_rec)
3✔
346
      return _ExecutorReturn.CONTINUE
3✔
347

348
    outcome, profile_stats = self.phase_executor.execute_phase(
3✔
349
        phase,
350
        run_with_profiling=self._run_phases_with_profiling,
351
        subtest_rec=subtest_rec,
352
    )
353
    if profile_stats is not None:
3✔
354
      self._phase_profile_stats.append(profile_stats)
3✔
355

356
    if (
3✔
357
        self.running_test_state.test_options.stop_on_first_failure
358
        or CONF.stop_on_first_failure
359
    ):
360
      # Stop Test on first measurement failure
361
      current_phase_result = self.running_test_state.test_record.phases[
3✔
362
          len(self.running_test_state.test_record.phases) - 1
363
      ]
364
      if current_phase_result.outcome == test_record.PhaseOutcome.FAIL:
3✔
365
        outcome = phase_executor.PhaseExecutionOutcome(
3✔
366
            phase_descriptor.PhaseResult.STOP)
367
        self.logger.error('Stopping test because stop_on_first_failure is True')
3✔
368

369
    if outcome.is_terminal:
3✔
370
      if not self._last_outcome:
3✔
371
        self._last_outcome = outcome
3✔
372
        self._last_execution_unit = phase.name
3✔
373
      return _ExecutorReturn.TERMINAL
3✔
374

375
    if outcome.is_fail_subtest:
3✔
376
      if not subtest_rec:
3✔
377
        raise TestExecutionError(
×
378
            'INVALID STATE: Phase returned outcome FAIL_SUBTEST when not '
379
            'in subtest.')
380
      subtest_rec.outcome = test_record.SubtestOutcome.FAIL
3✔
381
    return _ExecutorReturn.CONTINUE
3✔
382

383
  def _execute_checkpoint(self, checkpoint: phase_branches.Checkpoint,
3✔
384
                          subtest_rec: Optional[test_record.SubtestRecord],
385
                          in_teardown: bool) -> _ExecutorReturn:
386
    if not in_teardown and subtest_rec and subtest_rec.is_fail:
3✔
387
      self.phase_executor.skip_checkpoint(checkpoint, subtest_rec)
3✔
388
      return _ExecutorReturn.CONTINUE
3✔
389

390
    outcome = self.phase_executor.evaluate_checkpoint(checkpoint, subtest_rec)
3✔
391
    if outcome.is_terminal:
3✔
392
      if not self._last_outcome:
3✔
393
        self._last_outcome = outcome
3✔
394
        self._last_execution_unit = checkpoint.name
3✔
395
      return _ExecutorReturn.TERMINAL
3✔
396

397
    if outcome.is_fail_subtest:
3✔
398
      if not subtest_rec:
3✔
399
        raise TestExecutionError(
×
400
            'INVALID STATE: Phase returned outcome FAIL_SUBTEST when not '
401
            'in subtest.')
402
      subtest_rec.outcome = test_record.SubtestOutcome.FAIL
3✔
403
    return _ExecutorReturn.CONTINUE
3✔
404

405
  def _log_sequence(self, phase_sequence, override_message):
3✔
406
    message = phase_sequence.name
3✔
407
    if override_message:
3✔
408
      message = override_message
3✔
409
    if message:
3✔
410
      self.logger.debug('Executing phase nodes for %s', message)
3✔
411

412
  def _execute_sequence(
3✔
413
      self,
414
      phase_sequence: phase_collections.PhaseSequence,
415
      subtest_rec: Optional[test_record.SubtestRecord],
416
      in_teardown: bool,
417
      override_message: Optional[Text] = None) -> _ExecutorReturn:
418
    """Execute phase sequence.
419

420
    Args:
421
      phase_sequence: Sequence of phase nodes to run.
422
      subtest_rec: Current subtest record, if any.
423
      in_teardown: Indicates if currently processing a teardown sequence.
424
      override_message: Optional message to override when logging.
425

426
    Returns:
427
      _ExecutorReturn for how to proceed.
428
    """
429
    self._log_sequence(phase_sequence, override_message)
3✔
430

431
    if in_teardown:
3✔
432
      return self._execute_teardown_sequence(phase_sequence, subtest_rec)
3✔
433
    else:
434
      return self._execute_abortable_sequence(phase_sequence, subtest_rec)
3✔
435

436
  def _execute_abortable_sequence(
3✔
437
      self, phase_sequence: phase_collections.PhaseSequence,
438
      subtest_rec: Optional[test_record.SubtestRecord]) -> _ExecutorReturn:
439
    """Execute phase sequence, returning immediately on error or test abort.
440

441
    Args:
442
      phase_sequence: Sequence of phase nodes to run.
443
      subtest_rec: Current subtest record, if any.
444

445
    Returns:
446
      _ExecutorReturn for how to proceed.
447
    """
448
    for node in phase_sequence.nodes:
3✔
449
      if self._abort.is_set():
3✔
450
        return _ExecutorReturn.TERMINAL
3✔
451
      exe_ret = self._execute_node(node, subtest_rec, False)
3✔
452
      if exe_ret != _ExecutorReturn.CONTINUE:
3✔
453
        return exe_ret
3✔
454
    return _ExecutorReturn.CONTINUE
3✔
455

456
  def _execute_teardown_sequence(
3✔
457
      self, phase_sequence: phase_collections.PhaseSequence,
458
      subtest_rec: Optional[test_record.SubtestRecord]) -> _ExecutorReturn:
459
    """Execute all the teardown phases, regardless of errors.
460

461
    Args:
462
      phase_sequence: Sequence of phase nodes to run.
463
      subtest_rec: Current subtest record, if any.
464

465
    Returns:
466
      _ExecutorReturn for how to proceed.
467
    """
468
    ret = _ExecutorReturn.CONTINUE
3✔
469
    with self._teardown_phases_lock:
3✔
470
      for node in phase_sequence.nodes:
3✔
471
        if self._full_abort.is_set():
3✔
472
          return _ExecutorReturn.TERMINAL
3✔
473
        ret = _more_critical(ret, self._execute_node(node, subtest_rec, True))
3✔
474

475
    return ret
3✔
476

477
  @contextlib.contextmanager
3✔
478
  def _subtest_context(
3✔
479
      self, subtest: phase_collections.Subtest
480
  ) -> Iterator[test_record.SubtestRecord]:
481
    """Enter a subtest context.
482

483
    This context tracks the subname and sets up the subtest record to track the
484
    timing.
485

486
    Args:
487
      subtest: The subtest running during the context.
488

489
    Yields:
490
      The subtest record for updating the outcome.
491
    """
492
    self.logger.debug('%s: Starting subtest.', subtest.name)
3✔
493
    subtest_rec = test_record.SubtestRecord(
3✔
494
        name=subtest.name,
495
        start_time_millis=util.time_millis(),
496
        outcome=test_record.SubtestOutcome.PASS)
497
    yield subtest_rec
3✔
498
    subtest_rec.end_time_millis = util.time_millis()
3✔
499
    self.test_state.test_record.add_subtest_record(subtest_rec)
3✔
500

501
  def _execute_subtest(self, subtest: phase_collections.Subtest,
3✔
502
                       outer_subtest_rec: Optional[test_record.SubtestRecord],
503
                       in_teardown: bool) -> _ExecutorReturn:
504
    """Run a subtest node."""
505
    with self._subtest_context(subtest) as subtest_rec:
3✔
506
      if outer_subtest_rec and outer_subtest_rec.is_fail:
3✔
507
        subtest_rec.outcome = test_record.SubtestOutcome.FAIL
3✔
508

509
      ret = self._execute_sequence(subtest, subtest_rec, in_teardown)
3✔
510

511
      if ret == _ExecutorReturn.TERMINAL:
3✔
512
        subtest_rec.outcome = test_record.SubtestOutcome.STOP
3✔
513
        self.logger.debug('%s: Subtest stopping the test.', subtest.name)
3✔
514
      else:
515
        if subtest_rec.outcome is test_record.SubtestOutcome.FAIL:
3✔
516
          self.logger.debug('%s: Subtest failed;', subtest.name)
3✔
517
        else:
518
          self.logger.debug('%s: Subtest passed.', subtest.name)
3✔
519
      return ret
3✔
520

521
  def _execute_phase_branch(self, branch: phase_branches.BranchSequence,
3✔
522
                            subtest_rec: Optional[test_record.SubtestRecord],
523
                            in_teardown: bool) -> _ExecutorReturn:
524
    branch_message = branch.diag_condition.message
3✔
525
    if branch.name:
3✔
526
      branch_message = '{}:{}'.format(branch.name, branch_message)
3✔
527
    if not in_teardown and subtest_rec and subtest_rec.is_fail:
3✔
528
      self.logger.debug('%s: Branch not being run due to failed subtest.',
3✔
529
                        branch_message)
530
      return _ExecutorReturn.CONTINUE
3✔
531

532
    evaluated_millis = util.time_millis()
3✔
533
    if branch.should_run(self.running_test_state.diagnoses_manager.store):
3✔
534
      self.logger.debug('%s: Branch condition met; running phases.',
3✔
535
                        branch_message)
536
      branch_taken = True
3✔
537
      ret = self._execute_sequence(branch, subtest_rec, in_teardown)
3✔
538
    else:
539
      self.logger.debug('%s: Branch condition NOT met; not running sequence.',
3✔
540
                        branch_message)
541
      branch_taken = False
3✔
542
      ret = _ExecutorReturn.CONTINUE
3✔
543

544
    branch_rec = test_record.BranchRecord.from_branch(branch, branch_taken,
3✔
545
                                                      evaluated_millis)
546
    self.running_test_state.test_record.add_branch_record(branch_rec)
3✔
547
    return ret
3✔
548

549
  def _execute_phase_group(self, group: phase_group.PhaseGroup,
3✔
550
                           subtest_rec: Optional[test_record.SubtestRecord],
551
                           in_teardown: bool) -> _ExecutorReturn:
552
    """Executes the phases in a phase group.
553

554
    This will run the phases in the phase group, ensuring if the setup
555
    phases all run without error that the teardown phases will also run, no
556
    matter the errors during the main phases.
557

558
    This function is recursive.  Do not construct phase groups that contain
559
    themselves.
560

561
    Args:
562
      group: phase_group.PhaseGroup, the phase group to execute.
563
      subtest_rec: Current subtest record, if any.
564
      in_teardown: Indicates if currently processing a teardown sequence.
565

566
    Returns:
567
      True if the phases are terminal; otherwise returns False.
568
    """
569
    message_prefix = ''
3✔
570
    if group.name:
3✔
571
      self.logger.debug('Entering PhaseGroup %s', group.name)
3✔
572
      message_prefix = group.name + ':'
3✔
573
    # If in a subtest and it is already failing, the group will not be entered,
574
    # so the teardown phases will need to be skipped.
575
    skip_teardown = subtest_rec is not None and subtest_rec.is_fail
3✔
576
    if group.setup:
3✔
577
      setup_ret = self._execute_sequence(
3✔
578
          group.setup,
579
          subtest_rec,
580
          in_teardown,
581
          override_message=message_prefix + 'setup')
582
      if setup_ret != _ExecutorReturn.CONTINUE:
3✔
583
        return setup_ret
3✔
584
      if not skip_teardown:
3✔
585
        # If the subtest fails during the setup, the group is still not entered,
586
        # so skip the teardown phases here as well.
587
        skip_teardown = (subtest_rec is not None and subtest_rec.is_fail)
3✔
588
    if group.main:
3✔
589
      main_ret = self._execute_sequence(
3✔
590
          group.main,
591
          subtest_rec,
592
          in_teardown,
593
          override_message=message_prefix + 'main')
594
    else:
595
      main_ret = _ExecutorReturn.CONTINUE
3✔
596
    if group.teardown:
3✔
597
      teardown_ret = self._execute_sequence(
3✔
598
          group.teardown,
599
          subtest_rec,
600
          # If the subtest is already failing, record skips during the teardown
601
          # sequence.
602
          not skip_teardown,
603
          override_message=message_prefix + 'teardown')
604
    else:
605
      teardown_ret = _ExecutorReturn.CONTINUE
3✔
606
    return _more_critical(main_ret, teardown_ret)
3✔
607

608
  def _execute_phase_graph(
3✔
609
      self,
610
      graph: phase_graph.PhaseGraph,
611
      subtest_rec: Optional[test_record.SubtestRecord],
612
      in_teardown: bool,
613
  ) -> _ExecutorReturn:
614
    """Executes the phases in a phase graph concurrently according to DAG ordering."""
615

NEW
616
    if graph.name:
×
NEW
617
      self.logger.debug('Entering PhaseGraph %s', graph.name)
×
618

NEW
619
    nodes = list(graph.nodes)
×
620

NEW
621
    completed_phases = set()
×
NEW
622
    failed_phases = set()
×
NEW
623
    running_futures = {}
×
624

NEW
625
    with concurrent.futures.ThreadPoolExecutor(
×
626
        max_workers=min(32, (multiprocessing.cpu_count() or 1) + 4)
627
    ) as pool:
NEW
628
      while len(completed_phases) + len(failed_phases) < len(nodes):
×
NEW
629
        if self._abort.is_set() or self._full_abort.is_set():
×
NEW
630
          for fut in running_futures:
×
NEW
631
            fut.cancel()
×
NEW
632
          return _ExecutorReturn.TERMINAL
×
633

634
        # Submit any unblocked and un-scheduled phase
NEW
635
        made_progress = False
×
NEW
636
        for node in nodes:
×
NEW
637
          if (
×
638
              node.name in completed_phases
639
              or node.name in failed_phases
640
              or node.name in running_futures.values()
641
          ):
NEW
642
            continue
×
643

NEW
644
          prerequisite_satisfied = not node.options.prerequisites or all(
×
645
              (pr if isinstance(pr, str) else getattr(pr, 'name', None))
646
              in completed_phases
647
              for pr in node.options.prerequisites
648
          )
649

NEW
650
          if prerequisite_satisfied:
×
NEW
651
            self.running_test_state.add_concurrent_node(id(node))
×
NEW
652
            fut = pool.submit(
×
653
                self._execute_phase, node, subtest_rec, in_teardown
654
            )
NEW
655
            running_futures[fut] = node.name
×
NEW
656
            made_progress = True
×
657

NEW
658
        if not running_futures and not made_progress:
×
659
          # Blocked completely / cycle or missing prerequisites
NEW
660
          return _ExecutorReturn.TERMINAL
×
661

662
        # Wait for at least one currently running future to complete
NEW
663
        done, _ = concurrent.futures.wait(
×
664
            running_futures.keys(),
665
            return_when=concurrent.futures.FIRST_COMPLETED,
666
        )
667

NEW
668
        for fut in done:
×
NEW
669
          p_name = running_futures.pop(fut)
×
NEW
670
          try:
×
NEW
671
            res = fut.result()
×
NEW
672
            if res == _ExecutorReturn.TERMINAL:
×
NEW
673
              failed_phases.add(p_name)
×
NEW
674
              return _ExecutorReturn.TERMINAL
×
675
            else:
NEW
676
              completed_phases.add(p_name)
×
NEW
677
          except Exception:  # pylint: disable=broad-except
×
NEW
678
            self.logger.exception('Phase worker thread raised an exception.')
×
NEW
679
            failed_phases.add(p_name)
×
NEW
680
            return _ExecutorReturn.TERMINAL
×
681

NEW
682
    return _ExecutorReturn.CONTINUE
×
683

684
  def _execute_node(self, node: phase_nodes.PhaseNode,
3✔
685
                    subtest_rec: Optional[test_record.SubtestRecord],
686
                    in_teardown: bool) -> _ExecutorReturn:
687
    if isinstance(node, phase_collections.Subtest):
3✔
688
      return self._execute_subtest(node, subtest_rec, in_teardown)
3✔
689
    if isinstance(node, phase_branches.BranchSequence):
3✔
690
      return self._execute_phase_branch(node, subtest_rec, in_teardown)
3✔
691
    if isinstance(node, phase_collections.PhaseSequence):
3✔
692
      return self._execute_sequence(node, subtest_rec, in_teardown)
3✔
693
    if isinstance(node, phase_group.PhaseGroup):
3✔
694
      return self._execute_phase_group(node, subtest_rec, in_teardown)
3✔
695
    if isinstance(node, phase_graph.PhaseGraph):
3✔
NEW
696
      return self._execute_phase_graph(node, subtest_rec, in_teardown)
×
697
    if isinstance(node, phase_descriptor.PhaseDescriptor):
3✔
698
      return self._execute_phase(node, subtest_rec, in_teardown)
3✔
699
    if isinstance(node, phase_branches.Checkpoint):
3✔
700
      return self._execute_checkpoint(node, subtest_rec, in_teardown)
3✔
701
    self.logger.error('Unhandled node type: %s', node)
×
702
    return _ExecutorReturn.TERMINAL
×
703

704
  def _execute_test_diagnoser(
3✔
705
      self, diagnoser: diagnoses_lib.BaseTestDiagnoser) -> None:
706
    try:
3✔
707
      self.running_test_state.diagnoses_manager.execute_test_diagnoser(
3✔
708
          diagnoser, self.running_test_state.test_record
709
      )
710
    except Exception:  # pylint: disable=broad-except
3✔
711
      if self._last_outcome and self._last_outcome.is_terminal:
3✔
712
        self.logger.exception(
3✔
713
            'Test Diagnoser %s raised an exception, but the test outcome is '
714
            'already terminal; logging additional exception here.',
715
            diagnoser.name)
716
      else:
717
        # Record the equivalent failure outcome and exit early.
718
        self._last_outcome = phase_executor.PhaseExecutionOutcome(
3✔
719
            phase_executor.ExceptionInfo(*sys.exc_info()))
720
        self._last_execution_unit = str(diagnoser.name)
3✔
721

722
  def _execute_test_diagnosers(self) -> None:
3✔
723
    for diagnoser in self._test_options.diagnosers:
3✔
724
      self._execute_test_diagnoser(diagnoser)
3✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc