• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

google / openhtf / 11827221077

13 Nov 2024 10:53PM UTC coverage: 61.689% (-0.01%) from 61.703%
11827221077

push

github

copybara-github
Use repr instead of str for phase result logging

PiperOrigin-RevId: 696280022

1 of 1 new or added line in 1 file covered. (100.0%)

1 existing line in 1 file now uncovered.

4623 of 7494 relevant lines covered (61.69%)

3.08 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.54
/openhtf/core/phase_executor.py
1
# Copyright 2014 Google Inc. All Rights Reserved.
2

3
# Licensed under the Apache License, Version 2.0 (the "License");
4
# you may not use this file except in compliance with the License.
5
# You may obtain a copy of the License at
6

7
#     http://www.apache.org/licenses/LICENSE-2.0
8

9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS,
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
# See the License for the specific language governing permissions and
13
# limitations under the License.
14
"""PhaseExecutor module for handling the phases of a test.
15

16
Each phase is an instance of phase_descriptor.PhaseDescriptor and therefore has
17
relevant options. Each option is taken into account when executing a phase,
18
such as checking options.run_if as soon as possible and timing out at the
19
appropriate time.
20

21
A phase must return an phase_descriptor.PhaseResult, one of CONTINUE, REPEAT, or
22
STOP. A phase may also return None, or have no return statement, which is the
23
same as returning openhtf.PhaseResult.CONTINUE.  These results are then acted
24
upon accordingly and a new test run status is returned.
25

26
Phases are always run in order and not allowed to loop back, though a phase may
27
choose to repeat itself by returning REPEAT. Returning STOP will cause a test
28
to stop early, allowing a test to detect a bad state and not waste any further
29
time. A phase should not return TIMEOUT or ABORT, those are handled by the
30
framework.
31
"""
32

33
import pstats
5✔
34
import sys
5✔
35
import threading
5✔
36
import time
5✔
37
import traceback
5✔
38
import types
5✔
39
from typing import Any, Dict, Optional, Text, Tuple, Type, TYPE_CHECKING, Union
5✔
40

41
import attr
5✔
42
from openhtf import util
5✔
43
from openhtf.core import phase_branches
5✔
44
from openhtf.core import phase_descriptor
5✔
45
from openhtf.core import test_record
5✔
46
from openhtf.util import argv
5✔
47
from openhtf.util import threads
5✔
48
from openhtf.util import timeouts
5✔
49

50
if TYPE_CHECKING:
5✔
51
  from openhtf.core import test_state as htf_test_state  # pylint: disable=g-import-not-at-top
×
52

53
DEFAULT_PHASE_TIMEOUT_S = 3 * 60
5✔
54
_JOIN_TRY_INTERVAL_SECONDS = 3
5✔
55

56
ARG_PARSER = argv.module_parser()
5✔
57
ARG_PARSER.add_argument(
5✔
58
    '--phase_default_timeout_s',
59
    default=DEFAULT_PHASE_TIMEOUT_S,
60
    action=argv.StoreInModule,
61
    target='%s.DEFAULT_PHASE_TIMEOUT_S' % __name__,
62
    help='Test phase timeout in seconds')
63

64

65
@attr.s(slots=True, frozen=True)
5✔
66
class ExceptionInfo(object):
5✔
67
  """Wrap the description of a raised exception and its traceback."""
68

69
  exc_type = attr.ib(type=Type[BaseException])
5✔
70
  exc_val = attr.ib(type=BaseException)
5✔
71
  exc_tb = attr.ib(type=types.TracebackType)
5✔
72

73
  def as_base_types(self) -> Dict[Text, Text]:
5✔
74
    return {
5✔
75
        'exc_type': str(self.exc_type),
76
        'exc_val': str(self.exc_val),
77
        'exc_tb': self.get_traceback_string(),
78
    }
79

80
  def get_traceback_string(self) -> Text:
5✔
81
    return ''.join(
5✔
82
        traceback.format_exception(self.exc_type, self.exc_val, self.exc_tb))
83

84
  def __str__(self) -> Text:
5✔
UNCOV
85
    return self.exc_type.__name__
×
86

87

88
class InvalidPhaseResultError(Exception):
5✔
89
  """Raised when PhaseExecutionOutcome is created with invalid phase result."""
90

91

92
@attr.s(slots=True, frozen=True)
5✔
93
class PhaseExecutionOutcome(object):
5✔
94
  """Provide some utility and sanity around phase return values.
95

96
  This should not be confused with openhtf.PhaseResult.  PhaseResult is an
97
  enumeration to provide user-facing valid phase return values.  This tuple
98
  is used internally to track other possible outcomes (timeout, exception),
99
  and to perform some sanity checking (weird return values from phases).
100

101
  If phase_result is None, that indicates the phase timed out (this makes
102
  sense if you think about it, it timed out, so there was no result).  If
103
  phase_result is an instance of Exception, then that is the Exception that
104
  was raised by the phase.  The raised_exception attribute can be used as
105
  a convenience to test for that condition, and the is_timeout attribute can
106
  similarly be used to check for the timeout case.
107

108
  The only accepted values for phase_result are None (timeout), an instance
109
  of Exception (phase raised), or an instance of openhtf.PhaseResult.
110
  """
111

112
  phase_result = attr.ib(type=Union[None, phase_descriptor.PhaseResult,
5✔
113
                                    ExceptionInfo,
114
                                    threads.ThreadTerminationError])
115

116
  @property
5✔
117
  def is_aborted(self):
5✔
118
    return isinstance(self.phase_result, threads.ThreadTerminationError)
5✔
119

120
  @property
5✔
121
  def is_fail_and_continue(self):
5✔
122
    return self.phase_result is phase_descriptor.PhaseResult.FAIL_AND_CONTINUE
5✔
123

124
  @property
5✔
125
  def is_fail_subtest(self):
5✔
126
    return self.phase_result is phase_descriptor.PhaseResult.FAIL_SUBTEST
5✔
127

128
  @property
5✔
129
  def is_repeat(self):
5✔
130
    return self.phase_result is phase_descriptor.PhaseResult.REPEAT
5✔
131

132
  @property
5✔
133
  def is_skip(self):
5✔
134
    return self.phase_result is phase_descriptor.PhaseResult.SKIP
5✔
135

136
  @property
5✔
137
  def is_terminal(self):
5✔
138
    """True if this result will stop the test."""
139
    return (self.raised_exception or self.is_timeout or
5✔
140
            self.phase_result is phase_descriptor.PhaseResult.STOP)
141

142
  @property
5✔
143
  def is_timeout(self):
5✔
144
    """True if this PhaseExecutionOutcome indicates a phase timeout."""
145
    return self.phase_result is None
5✔
146

147
  @property
5✔
148
  def raised_exception(self):
5✔
149
    """True if the phase in question raised an exception."""
150
    return isinstance(self.phase_result,
5✔
151
                      (ExceptionInfo, threads.ThreadTerminationError))
152

153

154
class PhaseExecutorThread(threads.KillableThread):
5✔
155
  """Handles the execution and result of a single test phase.
156

157
  The phase outcome will be stored in the _phase_execution_outcome attribute
158
  once it is known (_phase_execution_outcome is None until then), and it will be
159
  a PhaseExecutionOutcome instance.
160
  """
161
  daemon = True
5✔
162

163
  def __init__(self, phase_desc: phase_descriptor.PhaseDescriptor,
5✔
164
               test_state: 'htf_test_state.TestState', run_with_profiling: bool,
165
               subtest_rec: Optional[test_record.SubtestRecord]):
166
    super(PhaseExecutorThread, self).__init__(
5✔
167
        name='<PhaseExecutorThread: (phase_desc.name)>',
168
        run_with_profiling=run_with_profiling,
169
        logger=test_state.state_logger.getChild('phase_executor_thread'))
170
    self._phase_desc = phase_desc
5✔
171
    self._test_state = test_state
5✔
172
    self._subtest_rec = subtest_rec
5✔
173
    self._phase_execution_outcome = None  # type: Optional[PhaseExecutionOutcome]
5✔
174

175
  def _thread_proc(self) -> None:
5✔
176
    """Execute the encompassed phase and save the result."""
177
    # Call the phase, save the return value, or default it to CONTINUE.
178
    phase_return = self._phase_desc(self._test_state)
5✔
179
    if phase_return is None:
5✔
180
      phase_return = phase_descriptor.PhaseResult.CONTINUE
5✔
181

182
    if not isinstance(phase_return, phase_descriptor.PhaseResult):
5✔
183
      raise InvalidPhaseResultError('Invalid phase result', phase_return)
5✔
184
    if (phase_return is phase_descriptor.PhaseResult.FAIL_SUBTEST and
5✔
185
        not self._subtest_rec):
186
      raise InvalidPhaseResultError(
5✔
187
          'Phase returned FAIL_SUBTEST but a subtest is not running.')
188
    self._phase_execution_outcome = PhaseExecutionOutcome(phase_return)
5✔
189

190
  def _log_exception(self, *args: Any) -> Any:
5✔
191
    """Log exception, while allowing unit testing to override."""
192
    self._test_state.state_logger.critical(*args)
5✔
193

194
  def _thread_exception(self, *args) -> bool:
5✔
195
    self._phase_execution_outcome = PhaseExecutionOutcome(ExceptionInfo(*args))
5✔
196
    self._log_exception('Phase %s raised an exception', self._phase_desc.name)
5✔
197
    return True  # Never propagate exceptions upward.
5✔
198

199
  def join_or_die(self) -> PhaseExecutionOutcome:
5✔
200
    """Wait for thread to finish, returning a PhaseExecutionOutcome instance."""
201
    deadline = time.monotonic() + DEFAULT_PHASE_TIMEOUT_S
5✔
202
    if self._phase_desc.options.timeout_s is not None:
5✔
203
      deadline = time.monotonic() + self._phase_desc.options.timeout_s
5✔
204
    while time.monotonic() < deadline:
5✔
205
      # Using exception to kill thread is not honored when thread is busy,
206
      # so we leave the thread behind, and move on teardown.
207
      self.join(_JOIN_TRY_INTERVAL_SECONDS)
5✔
208
      if not self.is_alive() or self._killed.is_set():
5✔
209
        break
4✔
210

211
    # We got a return value or an exception and handled it.
212
    if self._phase_execution_outcome:
5✔
213
      return self._phase_execution_outcome
5✔
214

215
    # Check for timeout, indicated by None for
216
    # PhaseExecutionOutcome.phase_result.
217
    if self.is_alive():
5✔
218
      self.kill()
5✔
219
      return PhaseExecutionOutcome(None)
5✔
220

221
    # Phase was killed.
222
    return PhaseExecutionOutcome(threads.ThreadTerminationError())
5✔
223

224
  @property
5✔
225
  def name(self) -> Text:
5✔
226
    return str(self)
5✔
227

228
  def __str__(self) -> Text:
5✔
229
    return '<{}: ({})>'.format(type(self).__name__, self._phase_desc.name)
5✔
230

231

232
class PhaseExecutor(object):
5✔
233
  """Encompasses the execution of the phases of a test."""
234

235
  def __init__(self, test_state: 'htf_test_state.TestState'):
5✔
236
    self.test_state = test_state
5✔
237
    self.logger = test_state.state_logger.getChild('phase_executor')
5✔
238
    # This lock exists to prevent stop() calls from being ignored if called when
239
    # _execute_phase_once is setting up the next phase thread.
240
    self._current_phase_thread_lock = threading.Lock()
5✔
241
    self._current_phase_thread = None  # type: Optional[PhaseExecutorThread]
5✔
242
    self._stopping = threading.Event()
5✔
243

244
  def _should_repeat(self, phase: phase_descriptor.PhaseDescriptor,
5✔
245
                     phase_execution_outcome: PhaseExecutionOutcome) -> bool:
246
    """Returns whether a phase should be repeated."""
247
    if phase_execution_outcome.is_timeout and phase.options.repeat_on_timeout:
5✔
248
      return True
×
249
    elif phase_execution_outcome.is_repeat:
5✔
250
      return True
5✔
251
    elif phase.options.force_repeat:
5✔
252
      return True
×
253
    elif phase.options.repeat_on_measurement_fail:
5✔
254
      last_phase_outcome = self.test_state.test_record.phases[-1].outcome
5✔
255
      return last_phase_outcome == test_record.PhaseOutcome.FAIL
5✔
256
    return False
5✔
257

258
  def execute_phase(
5✔
259
      self,
260
      phase: phase_descriptor.PhaseDescriptor,
261
      run_with_profiling: bool = False,
262
      subtest_rec: Optional[test_record.SubtestRecord] = None
263
  ) -> Tuple[PhaseExecutionOutcome, Optional[pstats.Stats]]:
264
    """Executes a phase or skips it, yielding PhaseExecutionOutcome instances.
265

266
    Args:
267
      phase: Phase to execute.
268
      run_with_profiling: Whether to run with cProfile stat collection for the
269
        phase code run inside a thread.
270
      subtest_rec: Optional subtest record.
271

272
    Returns:
273
      A two-tuple; the first item is the final PhaseExecutionOutcome that wraps
274
      the phase return value (or exception) of the final phase run. All
275
      intermediary results, if any, are REPEAT and handled internally. Returning
276
      REPEAT here means the phase hit its limit for repetitions.
277
      The second tuple item is the profiler Stats object if profiling was
278
      requested and successfully ran for this phase execution.
279
    """
280
    repeat_count = 1
5✔
281
    repeat_limit = (
5✔
282
        phase.options.repeat_limit or phase_descriptor.DEFAULT_REPEAT_LIMIT
283
    )
284
    while not self._stopping.is_set():
5✔
285
      is_last_repeat = repeat_count >= repeat_limit
5✔
286
      phase_execution_outcome, profile_stats = self._execute_phase_once(
5✔
287
          phase, is_last_repeat, run_with_profiling, subtest_rec)
288
      if (self._should_repeat(phase, phase_execution_outcome) and
5✔
289
          not is_last_repeat):
290
        repeat_count += 1
5✔
291
        continue
5✔
292

293
      return phase_execution_outcome, profile_stats
5✔
294
    # We've been cancelled, so just 'timeout' the phase.
295
    return PhaseExecutionOutcome(None), None
×
296

297
  def _execute_phase_once(
5✔
298
      self,
299
      phase_desc: phase_descriptor.PhaseDescriptor,
300
      is_last_repeat: bool,
301
      run_with_profiling: bool,
302
      subtest_rec: Optional[test_record.SubtestRecord],
303
  ) -> Tuple[PhaseExecutionOutcome, Optional[pstats.Stats]]:
304
    """Executes the given phase, returning a PhaseExecutionOutcome."""
305
    # Check this before we create a PhaseState and PhaseRecord.
306
    if phase_desc.options.run_if and not phase_desc.options.run_if():
5✔
307
      self.logger.debug('Phase %s skipped due to run_if returning falsey.',
5✔
308
                        phase_desc.name)
309
      return PhaseExecutionOutcome(phase_descriptor.PhaseResult.SKIP), None
5✔
310

311
    override_result = None
5✔
312
    with self.test_state.running_phase_context(phase_desc) as phase_state:
5✔
313
      if subtest_rec:
5✔
314
        self.logger.debug('Executing phase %s under subtest %s (from %s)',
5✔
315
                          phase_desc.name, phase_desc.func_location,
316
                          subtest_rec.name)
317
        phase_state.set_subtest_name(subtest_rec.name)
5✔
318
      else:
319
        self.logger.debug('Executing phase %s (from %s)', phase_desc.name,
5✔
320
                          phase_desc.func_location)
321
      with self._current_phase_thread_lock:
5✔
322
        # Checking _stopping must be in the lock context, otherwise there is a
323
        # race condition: this thread checks _stopping and then switches to
324
        # another thread where stop() sets _stopping and checks
325
        # _current_phase_thread (which would not be set yet).  In that case, the
326
        # new phase thread will be still be started.
327
        if self._stopping.is_set():
5✔
328
          # PhaseRecord will be written at this point, so ensure that it has a
329
          # Killed result.
330
          result = PhaseExecutionOutcome(threads.ThreadTerminationError())
×
331
          phase_state.result = result
×
332
          return result, None
×
333
        phase_thread = PhaseExecutorThread(phase_desc, self.test_state,
5✔
334
                                           run_with_profiling, subtest_rec)
335
        phase_thread.start()
5✔
336
        self._current_phase_thread = phase_thread
5✔
337

338
      phase_state.result = phase_thread.join_or_die()
5✔
339
      if phase_state.result.is_repeat and is_last_repeat:
5✔
340
        self.logger.error('Phase returned REPEAT, exceeding repeat_limit.')
5✔
341
        phase_state.hit_repeat_limit = True
5✔
342
        override_result = PhaseExecutionOutcome(
5✔
343
            phase_descriptor.PhaseResult.STOP)
344
      self._current_phase_thread = None
5✔
345

346
    # Refresh the result in case a validation for a partially set measurement
347
    # or phase diagnoser raised an exception.
348
    result = override_result or phase_state.result
5✔
349
    self.logger.debug('Phase %s finished with result %r', phase_desc.name,
5✔
350
                      result.phase_result)
351
    return (result,
5✔
352
            phase_thread.get_profile_stats() if run_with_profiling else None)
353

354
  def skip_phase(self, phase_desc: phase_descriptor.PhaseDescriptor,
5✔
355
                 subtest_rec: Optional[test_record.SubtestRecord]) -> None:
356
    """Skip a phase, but log a record of it."""
357
    self.logger.debug('Automatically skipping phase %s', phase_desc.name)
5✔
358
    with self.test_state.running_phase_context(phase_desc) as phase_state:
5✔
359
      if subtest_rec:
5✔
360
        phase_state.set_subtest_name(subtest_rec.name)
5✔
361
      phase_state.result = PhaseExecutionOutcome(
5✔
362
          phase_descriptor.PhaseResult.SKIP)
363

364
  def evaluate_checkpoint(
5✔
365
      self, checkpoint: phase_branches.Checkpoint,
366
      subtest_rec: Optional[test_record.SubtestRecord]
367
  ) -> PhaseExecutionOutcome:
368
    """Evaluate a checkpoint, returning a PhaseExecutionOutcome."""
369
    if subtest_rec:
5✔
370
      subtest_name = subtest_rec.name
5✔
371
      self.logger.debug('Evaluating checkpoint %s under subtest %s',
5✔
372
                        checkpoint.name, subtest_name)
373
    else:
374
      self.logger.debug('Evaluating checkpoint %s', checkpoint.name)
5✔
375
      subtest_name = None
5✔
376
    evaluated_millis = util.time_millis()
5✔
377
    try:
5✔
378
      outcome = PhaseExecutionOutcome(checkpoint.get_result(self.test_state,
5✔
379
                                                            subtest_rec))
380
      self.logger.debug('Checkpoint %s result: %s', checkpoint.name,
5✔
381
                        outcome.phase_result)
382
      if outcome.is_fail_subtest and not subtest_rec:
5✔
383
        raise InvalidPhaseResultError(
5✔
384
            'Checkpoint returned FAIL_SUBTEST, but subtest not running.')
385
    except Exception:  # pylint: disable=broad-except
5✔
386
      outcome = PhaseExecutionOutcome(ExceptionInfo(*sys.exc_info()))
5✔
387

388
    checkpoint_rec = test_record.CheckpointRecord.from_checkpoint(
5✔
389
        checkpoint, subtest_name, outcome, evaluated_millis)
390

391
    self.test_state.test_record.add_checkpoint_record(checkpoint_rec)
5✔
392

393
    return outcome
5✔
394

395
  def skip_checkpoint(self, checkpoint: phase_branches.Checkpoint,
5✔
396
                      subtest_rec: Optional[test_record.SubtestRecord]) -> None:
397
    """Skip a checkpoint, but log a record of it."""
398
    self.logger.debug('Automatically skipping checkpoint %s', checkpoint.name)
5✔
399
    subtest_name = subtest_rec.name if subtest_rec else None
5✔
400
    checkpoint_rec = test_record.CheckpointRecord.from_checkpoint(
5✔
401
        checkpoint, subtest_name,
402
        PhaseExecutionOutcome(phase_descriptor.PhaseResult.SKIP),
403
        util.time_millis())
404
    self.test_state.test_record.add_checkpoint_record(checkpoint_rec)
5✔
405

406
  def reset_stop(self) -> None:
5✔
407
    self._stopping.clear()
5✔
408

409
  def stop(
5✔
410
      self,
411
      timeout_s: Union[None, int, float,
412
                       timeouts.PolledTimeout] = None) -> None:
413
    """Stops execution of the current phase, if any.
414

415
    It will raise a ThreadTerminationError, which will cause the test to stop
416
    executing and terminate with an ERROR state.
417

418
    Args:
419
      timeout_s: int or None, timeout in seconds to wait for the phase to stop.
420
    """
421
    self._stopping.set()
5✔
422
    with self._current_phase_thread_lock:
5✔
423
      phase_thread = self._current_phase_thread
5✔
424
      if not phase_thread:
5✔
425
        return
×
426

427
    if phase_thread.is_alive():
5✔
428
      phase_thread.kill()
5✔
429

430
      self.logger.debug('Waiting for cancelled phase to exit: %s', phase_thread)
5✔
431
      timeout = timeouts.PolledTimeout.from_seconds(timeout_s)
5✔
432
      while phase_thread.is_alive() and not timeout.has_expired():
5✔
433
        time.sleep(0.1)
5✔
434
      self.logger.debug('Cancelled phase %s exit',
5✔
435
                        "didn't" if phase_thread.is_alive() else 'did')
436
    # Clear the currently running phase, whether it finished or timed out.
437
    self.test_state.stop_running_phase()
5✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc