• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pytransitions / transitions / 8938991339

03 May 2024 12:30PM UTC coverage: 98.432% (+0.2%) from 98.217%
8938991339

push

github

aleneum
use coverage only for mypy job and update setup.py tags

5149 of 5231 relevant lines covered (98.43%)

0.98 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

99.3
/transitions/extensions/nesting.py
1
# -*- coding: utf-8 -*-
2
"""
1✔
3
    transitions.extensions.nesting
4
    ------------------------------
5

6
    Implements a hierarchical state machine based on transitions.core.Machine. Supports nested states, parallel states
7
    and the composition of multiple hierarchical state machines.
8
"""
9

10
from collections import OrderedDict
1✔
11
import copy
1✔
12
from functools import partial, reduce
1✔
13
import inspect
1✔
14
import logging
1✔
15

16
try:
1✔
17
    # Enums are supported for Python 3.4+ and Python 2.7 with enum34 package installed
18
    from enum import Enum, EnumMeta
1✔
19
except ImportError:  # pragma: no cover
20
    # If enum is not available, create dummy classes for type checks
21
    class Enum:  # type: ignore
22
        """This is just an Enum stub for Python 2 and Python 3.3 and before without Enum support."""
23

24
    class EnumMeta:  # type: ignore
25
        """This is just an EnumMeta stub for Python 2 and Python 3.3 and before without Enum support."""
26

27
from six import string_types
1✔
28

29
from ..core import State, Machine, Transition, Event, listify, MachineError, EventData
1✔
30

31
_LOGGER = logging.getLogger(__name__)
1✔
32
_LOGGER.addHandler(logging.NullHandler())
1✔
33

34

35
# converts a hierarchical tree into a list of current states
36
def _build_state_list(state_tree, separator, prefix=None):
1✔
37
    prefix = prefix or []
1✔
38
    res = []
1✔
39
    for key, value in state_tree.items():
1✔
40
        if value:
1✔
41
            res.append(_build_state_list(value, separator, prefix=prefix + [key]))
1✔
42
        else:
43
            res.append(separator.join(prefix + [key]))
1✔
44
    return res if len(res) > 1 else res[0]
1✔
45

46

47
def resolve_order(state_tree):
1✔
48
    """Converts a (model) state tree into a list of state paths. States are ordered in the way in which states
49
    should be visited to process the event correctly (Breadth-first). This makes sure that ALL children are evaluated
50
    before parents in parallel states.
51
    Args:
52
        state_tree (dict): A dictionary representation of the model's state.
53
    Returns:
54
        list of lists of str representing the order of states to be processed.
55
    """
56
    queue = []
1✔
57
    res = []
1✔
58
    prefix = []
1✔
59
    while True:
1✔
60
        for state_name in reversed(list(state_tree.keys())):
1✔
61
            scope = prefix + [state_name]
1✔
62
            res.append(scope)
1✔
63
            if state_tree[state_name]:
1✔
64
                queue.append((scope, state_tree[state_name]))
1✔
65
        if not queue:
1✔
66
            break
1✔
67
        prefix, state_tree = queue.pop(0)
1✔
68
    return reversed(res)
1✔
69

70

71
class FunctionWrapper(object):
1✔
72
    """A wrapper to enable transitions' convenience function to_<state> for nested states.
1✔
73
        This allows to call model.to_A.s1.C() in case a custom separator has been chosen."""
74
    def __init__(self, func):
1✔
75
        """
76
        Args:
77
            func: Function to be called at the end of the path.
78
            path: If path is an empty string, assign function
79
        """
80
        self._func = func
1✔
81

82
    def add(self, func, path):
1✔
83
        """Assigns a `FunctionWrapper` as an attribute named like the next segment of the substates
84
            path.
85
        Args:
86
            func (callable): Function to be called at the end of the path.
87
            path (list of strings): Remaining segment of the substate path.
88
        """
89
        name = path[0]
1✔
90
        if name[0].isdigit():
1✔
91
            name = 's' + name
1✔
92
        if hasattr(self, name):
1✔
93
            getattr(self, name).add(func, path[1:])
1✔
94
        else:
95
            assert not path[1:], "nested path should be empty"
1✔
96
            setattr(self, name, FunctionWrapper(func))
1✔
97

98
    def __call__(self, *args, **kwargs):
1✔
99
        return self._func(*args, **kwargs)
1✔
100

101

102
class NestedEvent(Event):
1✔
103
    """An event type to work with nested states.
1✔
104
        This subclass is NOT compatible with simple Machine instances.
105
    """
106

107
    def trigger(self, model, *args, **kwargs):
1✔
108
        raise RuntimeError("NestedEvent.trigger must not be called directly. Call Machine.trigger_event instead.")
×
109

110
    def trigger_nested(self, event_data):
1✔
111
        """Executes all transitions that match the current state,
112
        halting as soon as one successfully completes.
113
        It is up to the machine's configuration of the Event whether processing happens queued (sequentially) or
114
        whether further Events are processed as they occur. NOTE: This should only
115
        be called by HierarchicalMachine instances.
116
        Args:
117
            event_data (NestedEventData): The currently processed event
118
        Returns: boolean indicating whether or not a transition was
119
            successfully executed (True if successful, False if not).
120
        """
121
        machine = event_data.machine
1✔
122
        model = event_data.model
1✔
123
        state_tree = machine.build_state_tree(getattr(model, machine.model_attribute), machine.state_cls.separator)
1✔
124
        state_tree = reduce(dict.get, machine.get_global_name(join=False), state_tree)
1✔
125
        ordered_states = resolve_order(state_tree)
1✔
126
        done = set()
1✔
127
        event_data.event = self
1✔
128
        for state_path in ordered_states:
1✔
129
            state_name = machine.state_cls.separator.join(state_path)
1✔
130
            if state_name not in done and state_name in self.transitions:
1✔
131
                event_data.state = machine.get_state(state_name)
1✔
132
                event_data.source_name = state_name
1✔
133
                event_data.source_path = copy.copy(state_path)
1✔
134
                self._process(event_data)
1✔
135
                if event_data.result:
1✔
136
                    elems = state_path
1✔
137
                    while elems:
1✔
138
                        done.add(machine.state_cls.separator.join(elems))
1✔
139
                        elems.pop()
1✔
140
        return event_data.result
1✔
141

142
    def _process(self, event_data):
1✔
143
        machine = event_data.machine
1✔
144
        machine.callbacks(event_data.machine.prepare_event, event_data)
1✔
145
        _LOGGER.debug("%sExecuted machine preparation callbacks before conditions.", machine.name)
1✔
146
        for trans in self.transitions[event_data.source_name]:
1✔
147
            event_data.transition = trans
1✔
148
            event_data.result = trans.execute(event_data)
1✔
149
            if event_data.result:
1✔
150
                break
1✔
151

152

153
class NestedEventData(EventData):
1✔
154
    """Collection of relevant data related to the ongoing nested transition attempt."""
1✔
155

156
    def __init__(self, state, event, machine, model, args, kwargs):
1✔
157
        super(NestedEventData, self).__init__(state, event, machine, model, args, kwargs)
1✔
158
        self.source_path = None
1✔
159
        self.source_name = None
1✔
160

161

162
class NestedState(State):
1✔
163
    """A state which allows substates.
1✔
164
    Attributes:
165
        states (OrderedDict): A list of substates of the current state.
166
        events (dict): A list of events defined for the nested state.
167
        initial (list, str, NestedState or Enum): (Name of a) child or list of children that should be entered when the
168
        state is entered.
169
    """
170

171
    separator = '_'
1✔
172
    u""" Separator between the names of parent and child states. In case '_' is required for
1✔
173
        naming state, this value can be set to other values such as '.' or even unicode characters
174
        such as '↦' (limited to Python 3 though).
175
    """
176

177
    def __init__(self, name, on_enter=None, on_exit=None, ignore_invalid_triggers=None, initial=None):
1✔
178
        super(NestedState, self).__init__(name=name, on_enter=on_enter, on_exit=on_exit,
1✔
179
                                          ignore_invalid_triggers=ignore_invalid_triggers)
180
        self.initial = initial
1✔
181
        self.events = {}
1✔
182
        self.states = OrderedDict()
1✔
183
        self._scope = []
1✔
184

185
    def add_substate(self, state):
1✔
186
        """Adds a state as a substate.
187
        Args:
188
            state (NestedState): State to add to the current state.
189
        """
190
        self.add_substates(state)
1✔
191

192
    def add_substates(self, states):
1✔
193
        """Adds a list of states to the current state.
194
        Args:
195
            states (list): List of state to add to the current state.
196
        """
197
        for state in listify(states):
1✔
198
            self.states[state.name] = state
1✔
199

200
    def scoped_enter(self, event_data, scope=None):
1✔
201
        """Enters a state with the provided scope.
202
        Args:
203
            event_data (NestedEventData): The currently processed event.
204
            scope (list(str)): Names of the state's parents starting with the top most parent.
205
        """
206
        self._scope = scope or []
1✔
207
        try:
1✔
208
            self.enter(event_data)
1✔
209
        finally:
210
            self._scope = []
1✔
211

212
    def scoped_exit(self, event_data, scope=None):
1✔
213
        """Exits a state with the provided scope.
214
        Args:
215
            event_data (NestedEventData): The currently processed event.
216
            scope (list(str)): Names of the state's parents starting with the top most parent.
217
        """
218
        self._scope = scope or []
1✔
219
        try:
1✔
220
            self.exit(event_data)
1✔
221
        finally:
222
            self._scope = []
1✔
223

224
    @property
1✔
225
    def name(self):
1✔
226
        return self.separator.join(self._scope + [super(NestedState, self).name])
1✔
227

228

229
class NestedTransition(Transition):
1✔
230
    """A transition which handles entering and leaving nested states."""
1✔
231

232
    def _resolve_transition(self, event_data):
1✔
233
        dst_name_path = self.dest.split(event_data.machine.state_cls.separator)
1✔
234
        _ = event_data.machine.get_state(dst_name_path)
1✔
235
        state_tree = event_data.machine.build_state_tree(
1✔
236
            listify(getattr(event_data.model, event_data.machine.model_attribute)),
237
            event_data.machine.state_cls.separator)
238

239
        scope = event_data.machine.get_global_name(join=False)
1✔
240
        tmp_tree = state_tree.get(dst_name_path[0], None)
1✔
241
        root = []
1✔
242
        while tmp_tree is not None:
1✔
243
            root.append(dst_name_path.pop(0))
1✔
244
            tmp_tree = tmp_tree.get(dst_name_path[0], None) if len(dst_name_path) > 0 else None
1✔
245

246
        # when destination is empty this means we are already in the state we want to enter
247
        # we deal with a reflexive transition here or a sibling state that has already been entered
248
        # as internal transitions have been already dealt with
249
        # the 'root' of src and dest will be set to the parent and dst (and src) substate will be set as destination
250
        if not dst_name_path:
1✔
251
            dst_name_path = [root.pop()]
1✔
252

253
        scoped_tree = reduce(dict.get, scope + root, state_tree)
1✔
254

255
        # if our scope is a parallel state we need to narrow down the exit scope to the targeted sibling
256
        if len(scoped_tree) > 1:
1✔
257
            exit_scope = {dst_name_path[0]: scoped_tree.get(dst_name_path[0])}
1✔
258
        else:
259
            exit_scope = scoped_tree
1✔
260

261
        exit_partials = [partial(event_data.machine.get_state(root + state_name).scoped_exit,
1✔
262
                                 event_data, scope + root + state_name[:-1])
263
                         for state_name in resolve_order(exit_scope)]
264

265
        new_states, enter_partials = self._enter_nested(root, dst_name_path, scope + root, event_data)
1✔
266

267
        # we reset/clear the whole branch if it is scoped, otherwise only reset the sibling
268
        if exit_scope == scoped_tree:
1✔
269
            scoped_tree.clear()
1✔
270
        for new_key, value in new_states.items():
1✔
271
            scoped_tree[new_key] = value
1✔
272
            break
1✔
273

274
        return state_tree, exit_partials, enter_partials
1✔
275

276
    def _change_state(self, event_data):
1✔
277
        state_tree, exit_partials, enter_partials = self._resolve_transition(event_data)
1✔
278
        for func in exit_partials:
1✔
279
            func()
1✔
280
        self._update_model(event_data, state_tree)
1✔
281
        for func in enter_partials:
1✔
282
            func()
1✔
283

284
    def _enter_nested(self, root, dest, prefix_path, event_data):
1✔
285
        if root:
1✔
286
            state_name = root.pop(0)
1✔
287
            with event_data.machine(state_name):
1✔
288
                return self._enter_nested(root, dest, prefix_path, event_data)
1✔
289
        elif dest:
1✔
290
            new_states = OrderedDict()
1✔
291
            state_name = dest.pop(0)
1✔
292
            with event_data.machine(state_name):
1✔
293
                new_states[state_name], new_enter = self._enter_nested([], dest, prefix_path + [state_name], event_data)
1✔
294
                enter_partials = [partial(event_data.machine.scoped.scoped_enter, event_data, prefix_path)] + new_enter
1✔
295
            return new_states, enter_partials
1✔
296
        elif event_data.machine.scoped.initial:
1✔
297
            new_states = OrderedDict()
1✔
298
            enter_partials = []
1✔
299
            queue = []
1✔
300
            prefix = prefix_path
1✔
301
            scoped_tree = new_states
1✔
302
            initial_names = [i.name if hasattr(i, 'name') else i for i in listify(event_data.machine.scoped.initial)]
1✔
303
            initial_states = [event_data.machine.scoped.states[n] for n in initial_names]
1✔
304
            while True:
1✔
305
                event_data.scope = prefix
1✔
306
                for state in initial_states:
1✔
307
                    enter_partials.append(partial(state.scoped_enter, event_data, prefix))
1✔
308
                    scoped_tree[state.name] = OrderedDict()
1✔
309
                    if state.initial:
1✔
310
                        queue.append((scoped_tree[state.name], prefix + [state.name],
1✔
311
                                     [state.states[i.name] if hasattr(i, 'name') else state.states[i]
312
                                     for i in listify(state.initial)]))
313
                if not queue:
1✔
314
                    break
1✔
315
                scoped_tree, prefix, initial_states = queue.pop(0)
1✔
316
            return new_states, enter_partials
1✔
317
        else:
318
            return {}, []
1✔
319

320
    @staticmethod
1✔
321
    def _update_model(event_data, tree):
1✔
322
        model_states = _build_state_list(tree, event_data.machine.state_cls.separator)
1✔
323
        with event_data.machine():
1✔
324
            event_data.machine.set_state(model_states, event_data.model)
1✔
325
            states = event_data.machine.get_states(listify(model_states))
1✔
326
            event_data.state = states[0] if len(states) == 1 else states
1✔
327

328
    # Prevent deep copying of callback lists since these include either references to callable or
329
    # strings. Deep copying a method reference would lead to the creation of an entire new (model) object
330
    # (see https://github.com/pytransitions/transitions/issues/248)
331
    # Note: When conditions are handled like other dynamic callbacks the key == "conditions" clause can be removed
332
    def __deepcopy__(self, memo):
1✔
333
        cls = self.__class__
1✔
334
        result = cls.__new__(cls)
1✔
335
        memo[id(self)] = result
1✔
336
        for key, value in self.__dict__.items():
1✔
337
            if key in cls.dynamic_methods or key == "conditions":
1✔
338
                setattr(result, key, copy.copy(value))
1✔
339
            else:
340
                setattr(result, key, copy.deepcopy(value, memo))
1✔
341
        return result
1✔
342

343

344
class HierarchicalMachine(Machine):
1✔
345
    """Extends transitions.core.Machine by capabilities to handle nested states.
1✔
346
        A hierarchical machine REQUIRES NestedStates, NestedEvent and NestedTransitions
347
        (or any subclass of it) to operate.
348
    """
349

350
    state_cls = NestedState
1✔
351
    transition_cls = NestedTransition
1✔
352
    event_cls = NestedEvent
1✔
353

354
    def __init__(self, model=Machine.self_literal, states=None, initial='initial', transitions=None,
1✔
355
                 send_event=False, auto_transitions=True,
356
                 ordered_transitions=False, ignore_invalid_triggers=None,
357
                 before_state_change=None, after_state_change=None, name=None,
358
                 queued=False, prepare_event=None, finalize_event=None, model_attribute='state', on_exception=None,
359
                 **kwargs):
360
        assert issubclass(self.state_cls, NestedState)
1✔
361
        assert issubclass(self.event_cls, NestedEvent)
1✔
362
        assert issubclass(self.transition_cls, NestedTransition)
1✔
363
        self._stack = []
1✔
364
        self.prefix_path = []
1✔
365
        self.scoped = self
1✔
366
        self._next_scope = None
1✔
367
        super(HierarchicalMachine, self).__init__(
1✔
368
            model=model, states=states, initial=initial, transitions=transitions,
369
            send_event=send_event, auto_transitions=auto_transitions,
370
            ordered_transitions=ordered_transitions, ignore_invalid_triggers=ignore_invalid_triggers,
371
            before_state_change=before_state_change, after_state_change=after_state_change, name=name,
372
            queued=queued, prepare_event=prepare_event, finalize_event=finalize_event, model_attribute=model_attribute,
373
            on_exception=on_exception, **kwargs
374
        )
375

376
    def __call__(self, to_scope=None):
1✔
377
        if isinstance(to_scope, string_types):
1✔
378
            state_name = to_scope.split(self.state_cls.separator)[0]
1✔
379
            state = self.states[state_name]
1✔
380
            to_scope = (state, state.states, state.events, self.prefix_path + [state_name])
1✔
381
        elif isinstance(to_scope, Enum):
1✔
382
            state = self.states[to_scope.name]
1✔
383
            to_scope = (state, state.states, state.events, self.prefix_path + [to_scope.name])
1✔
384
        elif to_scope is None:
1✔
385
            if self._stack:
1✔
386
                to_scope = self._stack[0]
1✔
387
            else:
388
                to_scope = (self, self.states, self.events, [])
1✔
389
        self._next_scope = to_scope
1✔
390

391
        return self
1✔
392

393
    def __enter__(self):
1✔
394
        self._stack.append((self.scoped, self.states, self.events, self.prefix_path))
1✔
395
        self.scoped, self.states, self.events, self.prefix_path = self._next_scope
1✔
396
        self._next_scope = None
1✔
397

398
    def __exit__(self, exc_type, exc_val, exc_tb):
1✔
399
        self.scoped, self.states, self.events, self.prefix_path = self._stack.pop()
1✔
400

401
    def add_model(self, model, initial=None):
1✔
402
        """Extends transitions.core.Machine.add_model by applying a custom 'to' function to
403
            the added model.
404
        """
405
        models = [self if mod is self.self_literal else mod for mod in listify(model)]
1✔
406
        super(HierarchicalMachine, self).add_model(models, initial=initial)
1✔
407
        initial_name = getattr(models[0], self.model_attribute)
1✔
408
        if hasattr(initial_name, 'name'):
1✔
409
            initial_name = initial_name.name
1✔
410
        # initial states set by add_model or machine might contain initial states themselves.
411
        if isinstance(initial_name, string_types):
1✔
412
            initial_states = self._resolve_initial(models, initial_name.split(self.state_cls.separator))
1✔
413
        # when initial is set to a (parallel) state, we accept it as it is
414
        else:
415
            initial_states = initial_name
1✔
416
        for mod in models:
1✔
417
            self.set_state(initial_states, mod)
1✔
418
            if hasattr(mod, 'to'):
1✔
419
                _LOGGER.warning("%sModel already has a 'to'-method. It will NOT "
1✔
420
                                "be overwritten by NestedMachine", self.name)
421
            else:
422
                to_func = partial(self.to_state, mod)
1✔
423
                setattr(mod, 'to', to_func)
1✔
424

425
    @property
1✔
426
    def initial(self):
1✔
427
        """Return the initial state."""
428
        return self._initial
1✔
429

430
    @initial.setter
1✔
431
    def initial(self, value):
1✔
432
        self._initial = self._recursive_initial(value)
1✔
433

434
    def add_ordered_transitions(self, states=None, trigger='next_state',
1✔
435
                                loop=True, loop_includes_initial=True,
436
                                conditions=None, unless=None, before=None,
437
                                after=None, prepare=None, **kwargs):
438
        if states is None:
1✔
439
            states = self.get_nested_state_names()
1✔
440
        super(HierarchicalMachine, self).add_ordered_transitions(states=states, trigger=trigger, loop=loop,
1✔
441
                                                                 loop_includes_initial=loop_includes_initial,
442
                                                                 conditions=conditions,
443
                                                                 unless=unless, before=before, after=after,
444
                                                                 prepare=prepare, **kwargs)
445

446
    def add_states(self, states, on_enter=None, on_exit=None, ignore_invalid_triggers=None, **kwargs):
1✔
447
        """Add new nested state(s).
448
        Args:
449
            states (list, str, dict, Enum, NestedState or HierarchicalMachine): a list, a NestedState instance, the
450
                name of a new state, an enumeration (member) or a dict with keywords to pass on to the
451
                NestedState initializer. If a list, each element can be a string, dict, NestedState or
452
                enumeration member.
453
            on_enter (str or list): callbacks to trigger when the state is
454
                entered. Only valid if first argument is string.
455
            on_exit (str or list): callbacks to trigger when the state is
456
                exited. Only valid if first argument is string.
457
            ignore_invalid_triggers: when True, any calls to trigger methods
458
                that are not valid for the present state (e.g., calling an
459
                a_to_b() trigger when the current state is c) will be silently
460
                ignored rather than raising an invalid transition exception.
461
                Note that this argument takes precedence over the same
462
                argument defined at the Machine level, and is in turn
463
                overridden by any ignore_invalid_triggers explicitly
464
                passed in an individual state's initialization arguments.
465
            **kwargs additional keyword arguments used by state mixins.
466
        """
467
        remap = kwargs.pop('remap', None)
1✔
468
        ignore = self.ignore_invalid_triggers if ignore_invalid_triggers is None else ignore_invalid_triggers
1✔
469

470
        for state in listify(states):
1✔
471
            if isinstance(state, Enum):
1✔
472
                if isinstance(state.value, EnumMeta):
1✔
473
                    state = {'name': state, 'children': state.value}
1✔
474
                elif isinstance(state.value, dict):
1✔
475
                    state = dict(name=state, **state.value)
1✔
476
            if isinstance(state, string_types):
1✔
477
                self._add_string_state(state, on_enter, on_exit, ignore, remap, **kwargs)
1✔
478
            elif isinstance(state, Enum):
1✔
479
                self._add_enum_state(state, on_enter, on_exit, ignore, remap, **kwargs)
1✔
480
            elif isinstance(state, dict):
1✔
481
                self._add_dict_state(state, ignore, remap, **kwargs)
1✔
482
            elif isinstance(state, NestedState):
1✔
483
                if state.name in self.states:
1✔
484
                    raise ValueError("State {0} cannot be added since it already exists.".format(state.name))
1✔
485
                self.states[state.name] = state
1✔
486
                self._init_state(state)
1✔
487
            elif isinstance(state, HierarchicalMachine):
1✔
488
                self._add_machine_states(state, remap)
1✔
489
            elif isinstance(state, State) and not isinstance(state, NestedState):
1✔
490
                raise ValueError("A passed state object must derive from NestedState! "
×
491
                                 "A default State object is not sufficient")
492
            else:
493
                raise ValueError("Cannot add state of type {0}. ".format(type(state).__name__))
1✔
494

495
    def add_transition(self, trigger, source, dest, conditions=None,
1✔
496
                       unless=None, before=None, after=None, prepare=None, **kwargs):
497
        if source == self.wildcard_all and dest == self.wildcard_same:
1✔
498
            source = self.get_nested_state_names()
1✔
499
        else:
500
            if source != self.wildcard_all:
1✔
501
                source = [self.state_cls.separator.join(self._get_enum_path(s)) if isinstance(s, Enum) else s
1✔
502
                          for s in listify(source)]
503
            if dest != self.wildcard_same:
1✔
504
                dest = self.state_cls.separator.join(self._get_enum_path(dest)) if isinstance(dest, Enum) else dest
1✔
505
        super(HierarchicalMachine, self).add_transition(trigger, source, dest, conditions,
1✔
506
                                                        unless, before, after, prepare, **kwargs)
507

508
    def get_global_name(self, state=None, join=True):
1✔
509
        """Returns the name of the passed state in context of the current prefix/scope.
510
        Args:
511
            state (str, Enum or NestedState): The state to be analyzed.
512
            join (bool): Whether this method should join the path elements or not
513
        Returns:
514
            str or list(str) of the global state name
515
        """
516
        domains = copy.copy(self.prefix_path)
1✔
517
        if state:
1✔
518
            state_name = state.name if hasattr(state, 'name') else state
1✔
519
            if state_name in self.states:
1✔
520
                domains.append(state_name)
1✔
521
            else:
522
                raise ValueError("State '{0}' not found in local states.".format(state))
×
523
        return self.state_cls.separator.join(domains) if join else domains
1✔
524

525
    def get_nested_state_names(self):
1✔
526
        """Returns a list of global names of all states of a machine.
527
        Returns:
528
            list(str) of global state names.
529
        """
530
        ordered_states = []
1✔
531
        for state in self.states.values():
1✔
532
            ordered_states.append(self.get_global_name(state))
1✔
533
            with self(state.name):
1✔
534
                ordered_states.extend(self.get_nested_state_names())
1✔
535
        return ordered_states
1✔
536

537
    def get_nested_transitions(self, trigger="", src_path=None, dest_path=None):
1✔
538
        """Retrieves a list of all transitions matching the passed requirements.
539
        Args:
540
            trigger (str): If set, return only transitions related to this trigger.
541
            src_path (list(str)): If set, return only transitions with this source state.
542
            dest_path (list(str)): If set, return only transitions with this destination.
543

544
        Returns:
545
            list(NestedTransitions) of valid transitions.
546
        """
547
        if src_path and dest_path:
1✔
548
            src = self.state_cls.separator.join(src_path)
1✔
549
            dest = self.state_cls.separator.join(dest_path)
1✔
550
            transitions = super(HierarchicalMachine, self).get_transitions(trigger, src, dest)
1✔
551
            if len(src_path) > 1 and len(dest_path) > 1:
1✔
552
                with self(src_path[0]):
1✔
553
                    transitions.extend(self.get_nested_transitions(trigger, src_path[1:], dest_path[1:]))
1✔
554
        elif src_path:
1✔
555
            src = self.state_cls.separator.join(src_path)
1✔
556
            transitions = super(HierarchicalMachine, self).get_transitions(trigger, src, "*")
1✔
557
            if len(src_path) > 1:
1✔
558
                with self(src_path[0]):
1✔
559
                    transitions.extend(self.get_nested_transitions(trigger, src_path[1:], None))
1✔
560
        elif dest_path:
1✔
561
            dest = self.state_cls.separator.join(dest_path)
1✔
562
            transitions = super(HierarchicalMachine, self).get_transitions(trigger, "*", dest)
1✔
563
            if len(dest_path) > 1:
1✔
564
                for state_name in self.states:
1✔
565
                    with self(state_name):
1✔
566
                        transitions.extend(self.get_nested_transitions(trigger, None, dest_path[1:]))
1✔
567
        else:
568
            transitions = super(HierarchicalMachine, self).get_transitions(trigger, "*", "*")
1✔
569
            for state_name in self.states:
1✔
570
                with self(state_name):
1✔
571
                    transitions.extend(self.get_nested_transitions(trigger, None, None))
1✔
572
        return transitions
1✔
573

574
    def get_nested_triggers(self, src_path=None):
1✔
575
        """Retrieves a list of valid triggers.
576
        Args:
577
            src_path (list(str)): A list representation of the source state's name.
578
        Returns:
579
            list(str) of valid trigger names.
580
        """
581
        if src_path:
1✔
582
            triggers = super(HierarchicalMachine, self).get_triggers(self.state_cls.separator.join(src_path))
1✔
583
            if len(src_path) > 1 and src_path[0] in self.states:
1✔
584
                with self(src_path[0]):
1✔
585
                    triggers.extend(self.get_nested_triggers(src_path[1:]))
1✔
586
        else:
587
            triggers = list(self.events.keys())
1✔
588
            for state_name in self.states:
1✔
589
                with self(state_name):
1✔
590
                    triggers.extend(self.get_nested_triggers())
1✔
591
        return triggers
1✔
592

593
    def get_state(self, state, hint=None):
1✔
594
        """Return the State instance with the passed name.
595
        Args:
596
            state (str, Enum or list(str)): A state name, enum or state path
597
            hint (list(str)): A state path to check for the state in question
598
        Returns:
599
            NestedState that belongs to the passed str (list) or Enum.
600
        """
601
        if isinstance(state, Enum):
1✔
602
            state = self._get_enum_path(state)
1✔
603
        elif isinstance(state, string_types):
1✔
604
            state = state.split(self.state_cls.separator)
1✔
605
        if not hint:
1✔
606
            state = copy.copy(state)
1✔
607
            hint = copy.copy(state)
1✔
608
        if len(state) > 1:
1✔
609
            child = state.pop(0)
1✔
610
            try:
1✔
611
                with self(child):
1✔
612
                    return self.get_state(state, hint)
1✔
613
            except (KeyError, ValueError):
1✔
614
                try:
1✔
615
                    with self():
1✔
616
                        state = self
1✔
617
                        for elem in hint:
1✔
618
                            state = state.states[elem]
1✔
619
                        return state
1✔
620
                except KeyError:
1✔
621
                    raise ValueError(
1✔
622
                        "State '%s' is not a registered state." % self.state_cls.separator.join(hint)
623
                    )  # from KeyError
624
        elif state[0] not in self.states:
1✔
625
            raise ValueError("State '%s' is not a registered state." % state)
1✔
626
        return self.states[state[0]]
1✔
627

628
    def get_states(self, states):
1✔
629
        """Retrieves a list of NestedStates.
630
        Args:
631
            states (str, Enum or list of str or Enum): Names/values of the states to retrieve.
632
        Returns:
633
            list(NestedStates) belonging to the passed identifiers.
634
        """
635
        res = []
1✔
636
        for state in states:
1✔
637
            if isinstance(state, list):
1✔
638
                res.append(self.get_states(state))
1✔
639
            else:
640
                res.append(self.get_state(state))
1✔
641
        return res
1✔
642

643
    def get_transitions(self, trigger="", source="*", dest="*", delegate=False):
1✔
644
        """Return the transitions from the Machine.
645
        Args:
646
            trigger (str): Trigger name of the transition.
647
            source (str, State or Enum): Limits list to transitions from a certain state.
648
            dest (str, State or Enum): Limits list to transitions to a certain state.
649
            delegate (Optional[bool]): If True, consider delegations to parents of source
650
        Returns:
651
            list(NestedTransitions): All transitions matching the request.
652
        """
653
        with self():
1✔
654
            source_path = [] if source == "*" \
1✔
655
                else source.split(self.state_cls.separator) if isinstance(source, string_types) \
656
                else self._get_enum_path(source) if isinstance(source, Enum) \
657
                else self._get_state_path(source)
658
            dest_path = [] if dest == "*" \
1✔
659
                else dest.split(self.state_cls.separator) if isinstance(dest, string_types) \
660
                else self._get_enum_path(dest) if isinstance(dest, Enum) \
661
                else self._get_state_path(dest)
662
            matches = self.get_nested_transitions(trigger, source_path, dest_path)
1✔
663
            # only consider delegations when source_path contains a nested state (len > 1)
664
            if delegate is False or len(source_path) < 2:
1✔
665
                return matches
1✔
666
            source_path.pop()
1✔
667
            while source_path:
1✔
668
                matches.extend(self.get_transitions(trigger,
1✔
669
                                                    source=self.state_cls.separator.join(source_path),
670
                                                    dest=dest))
671
                source_path.pop()
1✔
672
            return matches
1✔
673

674
    def _can_trigger(self, model, trigger, *args, **kwargs):
1✔
675
        state_tree = self.build_state_tree(getattr(model, self.model_attribute), self.state_cls.separator)
1✔
676
        ordered_states = resolve_order(state_tree)
1✔
677
        for state_path in ordered_states:
1✔
678
            with self():
1✔
679
                return self._can_trigger_nested(model, trigger, state_path, *args, **kwargs)
1✔
680

681
    def _can_trigger_nested(self, model, trigger, path, *args, **kwargs):
1✔
682
        evt = NestedEventData(None, None, self, model, args, kwargs)
1✔
683
        if trigger in self.events:
1✔
684
            source_path = copy.copy(path)
1✔
685
            while source_path:
1✔
686
                state_name = self.state_cls.separator.join(source_path)
1✔
687
                for transition in self.events[trigger].transitions.get(state_name, []):
1✔
688
                    try:
1✔
689
                        _ = self.get_state(transition.dest)
1✔
690
                    except ValueError:
1✔
691
                        continue
1✔
692
                    self.callbacks(self.prepare_event, evt)
1✔
693
                    self.callbacks(transition.prepare, evt)
1✔
694
                    if all(c.check(evt) for c in transition.conditions):
1✔
695
                        return True
1✔
696
                source_path.pop(-1)
1✔
697
        if path:
1✔
698
            with self(path.pop(0)):
1✔
699
                return self._can_trigger_nested(model, trigger, path, *args, **kwargs)
1✔
700
        return False
1✔
701

702
    def get_triggers(self, *args):
1✔
703
        """Extends transitions.core.Machine.get_triggers to also include parent state triggers."""
704
        triggers = []
1✔
705
        with self():
1✔
706
            for state in args:
1✔
707
                state_name = state.name if hasattr(state, 'name') else state
1✔
708
                state_path = state_name.split(self.state_cls.separator)
1✔
709
                if len(state_path) > 1:  # we only need to check substates when 'state_name' refers to a substate
1✔
710
                    with self(state_path[0]):
1✔
711
                        triggers.extend(self.get_nested_triggers(state_path[1:]))
1✔
712
                while state_path:  # check all valid transitions for parent states
1✔
713
                    triggers.extend(super(HierarchicalMachine, self).get_triggers(
1✔
714
                        self.state_cls.separator.join(state_path)))
715
                    state_path.pop()
1✔
716
        return triggers
1✔
717

718
    def has_trigger(self, trigger, state=None):
1✔
719
        """Check whether an event/trigger is known to the machine
720
        Args:
721
            trigger (str): Event/trigger name
722
            state (optional[NestedState]): Limits the recursive search to this state and its children
723
        Returns:
724
            bool: True if event is known and False otherwise
725
        """
726

727
        state = state or self
1✔
728
        return trigger in state.events or any(self.has_trigger(trigger, sta) for sta in state.states.values())
1✔
729

730
    def is_state(self, state, model, allow_substates=False):
1✔
731
        if allow_substates:
1✔
732
            current = getattr(model, self.model_attribute)
1✔
733
            current_name = self.state_cls.separator.join(self._get_enum_path(current))\
1✔
734
                if isinstance(current, Enum) else current
735
            state_name = self.state_cls.separator.join(self._get_enum_path(state))\
1✔
736
                if isinstance(state, Enum) else state
737
            return current_name.startswith(state_name)
1✔
738
        return getattr(model, self.model_attribute) == state
1✔
739

740
    def on_enter(self, state_name, callback):
1✔
741
        """Helper function to add callbacks to states in case a custom state separator is used.
742
        Args:
743
            state_name (str): Name of the state
744
            callback (str or callable): Function to be called. Strings will be resolved to model functions.
745
        """
746
        self.get_state(state_name).add_callback('enter', callback)
1✔
747

748
    def on_exit(self, state_name, callback):
1✔
749
        """Helper function to add callbacks to states in case a custom state separator is used.
750
        Args:
751
            state_name (str): Name of the state
752
            callback (str or callable): Function to be called. Strings will be resolved to model functions.
753
        """
754
        self.get_state(state_name).add_callback('exit', callback)
1✔
755

756
    def set_state(self, state, model=None):
1✔
757
        """Set the current state.
758
        Args:
759
            state (list of str or Enum or State): value of state(s) to be set
760
            model (optional[object]): targeted model; if not set, all models will be set to 'state'
761
        """
762
        values = [self._set_state(value) for value in listify(state)]
1✔
763
        models = self.models if model is None else listify(model)
1✔
764
        for mod in models:
1✔
765
            setattr(mod, self.model_attribute, values if len(values) > 1 else values[0])
1✔
766

767
    def to_state(self, model, state_name, *args, **kwargs):
1✔
768
        """Helper function to add go to states in case a custom state separator is used.
769
        Args:
770
            model (class): The model that should be used.
771
            state_name (str): Name of the destination state.
772
        """
773

774
        current_state = getattr(model, self.model_attribute)
1✔
775
        if isinstance(current_state, list):
1✔
776
            raise MachineError("Cannot use 'to_state' from parallel state")
1✔
777

778
        event = NestedEventData(self.get_state(current_state), Event('to', self), self,
1✔
779
                                model, args=args, kwargs=kwargs)
780
        if isinstance(current_state, Enum):
1✔
781
            event.source_path = self._get_enum_path(current_state)
1✔
782
            event.source_name = self.state_cls.separator.join(event.source_path)
1✔
783
        else:
784
            event.source_name = current_state
1✔
785
            event.source_path = current_state.split(self.state_cls.separator)
1✔
786
        self._create_transition(event.source_name, state_name).execute(event)
1✔
787

788
    def trigger_event(self, model, trigger, *args, **kwargs):
1✔
789
        """Processes events recursively and forwards arguments if suitable events are found.
790
        This function is usually bound to models with model and trigger arguments already
791
        resolved as a partial. Execution will halt when a nested transition has been executed
792
        successfully.
793
        Args:
794
            model (object): targeted model
795
            trigger (str): event name
796
            *args: positional parameters passed to the event and its callbacks
797
            **kwargs: keyword arguments passed to the event and its callbacks
798
        Returns:
799
            bool: whether a transition has been executed successfully
800
        Raises:
801
            MachineError: When no suitable transition could be found and ignore_invalid_trigger
802
                          is not True. Note that a transition which is not executed due to conditions
803
                          is still considered valid.
804
        """
805
        event_data = NestedEventData(state=None, event=None, machine=self, model=model, args=args, kwargs=kwargs)
1✔
806
        event_data.result = None
1✔
807

808
        return self._process(partial(self._trigger_event, event_data, trigger))
1✔
809

810
    def _trigger_event(self, event_data, trigger):
1✔
811
        try:
1✔
812
            with self():
1✔
813
                res = self._trigger_event_nested(event_data, trigger, None)
1✔
814
            event_data.result = self._check_event_result(res, event_data.model, trigger)
1✔
815
        except BaseException as err:  # pylint: disable=broad-except; Exception will be handled elsewhere
1✔
816
            event_data.error = err
1✔
817
            if self.on_exception:
1✔
818
                self.callbacks(self.on_exception, event_data)
1✔
819
            else:
820
                raise
1✔
821
        finally:
822
            try:
1✔
823
                self.callbacks(self.finalize_event, event_data)
1✔
824
                _LOGGER.debug("%sExecuted machine finalize callbacks", self.name)
1✔
825
            except BaseException as err:  # pylint: disable=broad-except; Exception will be handled elsewhere
1✔
826
                _LOGGER.error("%sWhile executing finalize callbacks a %s occurred: %s.",
1✔
827
                              self.name,
828
                              type(err).__name__,
829
                              str(err))
830
        return event_data.result
1✔
831

832
    def _add_model_to_state(self, state, model):
1✔
833
        name = self.get_global_name(state)
1✔
834
        if self.state_cls.separator == '_':
1✔
835
            value = state.value if isinstance(state.value, Enum) else name
1✔
836
            self._checked_assignment(model, 'is_%s' % name, partial(self.is_state, value, model))
1✔
837
            # Add dynamic method callbacks (enter/exit) if there are existing bound methods in the model
838
            # except if they are already mentioned in 'on_enter/exit' of the defined state
839
            for callback in self.state_cls.dynamic_methods:
1✔
840
                method = "{0}_{1}".format(callback, name)
1✔
841
                if hasattr(model, method) and inspect.ismethod(getattr(model, method)) and \
1✔
842
                        method not in getattr(state, callback):
843
                    state.add_callback(callback[3:], method)
1✔
844
        else:
845
            path = name.split(self.state_cls.separator)
1✔
846
            value = state.value if isinstance(state.value, Enum) else name
1✔
847
            trig_func = partial(self.is_state, value, model)
1✔
848
            if hasattr(model, 'is_' + path[0]):
1✔
849
                getattr(model, 'is_' + path[0]).add(trig_func, path[1:])
1✔
850
            else:
851
                assert not path[1:], "nested path should be empty"
1✔
852
                self._checked_assignment(model, 'is_' + path[0], FunctionWrapper(trig_func))
1✔
853
        with self(state.name):
1✔
854
            for event in self.events.values():
1✔
855
                if not hasattr(model, event.name):
1✔
856
                    self._add_trigger_to_model(event.name, model)
1✔
857
            for a_state in self.states.values():
1✔
858
                self._add_model_to_state(a_state, model)
1✔
859

860
    def _add_dict_state(self, state, ignore_invalid_triggers, remap, **kwargs):
1✔
861
        if remap is not None and state['name'] in remap:
1✔
862
            return
1✔
863
        state = state.copy()  # prevent messing with the initially passed dict
1✔
864
        remap = state.pop('remap', None)
1✔
865
        if 'ignore_invalid_triggers' not in state:
1✔
866
            state['ignore_invalid_triggers'] = ignore_invalid_triggers
1✔
867

868
        # parallel: [states] is just a short handle for {children: [states], initial: [state_names]}
869
        state_parallel = state.pop('parallel', [])
1✔
870
        if state_parallel:
1✔
871
            state_children = state_parallel
1✔
872
            state['initial'] = [s['name'] if isinstance(s, dict)
1✔
873
                                else s for s in state_children]
874
        else:
875
            state_children = state.pop('children', state.pop('states', []))
1✔
876
        transitions = state.pop('transitions', [])
1✔
877
        new_state = self._create_state(**state)
1✔
878
        self.states[new_state.name] = new_state
1✔
879
        self._init_state(new_state)
1✔
880
        remapped_transitions = []
1✔
881
        with self(new_state.name):
1✔
882
            self.add_states(state_children, remap=remap, **kwargs)
1✔
883
            if transitions:
1✔
884
                self.add_transitions(transitions)
1✔
885
            if remap is not None:
1✔
886
                remapped_transitions.extend(self._remap_state(new_state, remap))
1✔
887

888
        self.add_transitions(remapped_transitions)
1✔
889

890
    def _add_enum_state(self, state, on_enter, on_exit, ignore_invalid_triggers, remap, **kwargs):
1✔
891
        if remap is not None and state.name in remap:
1✔
892
            return
×
893
        if self.state_cls.separator in state.name:
1✔
894
            raise ValueError("State '{0}' contains '{1}' which is used as state name separator. "
1✔
895
                             "Consider changing the NestedState.separator to avoid this issue."
896
                             "".format(state.name, self.state_cls.separator))
897
        if state.name in self.states:
1✔
898
            raise ValueError("State {0} cannot be added since it already exists.".format(state.name))
1✔
899
        new_state = self._create_state(state, on_enter=on_enter, on_exit=on_exit,
1✔
900
                                       ignore_invalid_triggers=ignore_invalid_triggers, **kwargs)
901
        self.states[new_state.name] = new_state
1✔
902
        self._init_state(new_state)
1✔
903

904
    def _add_machine_states(self, state, remap):
1✔
905
        new_states = [s for s in state.states.values() if remap is None or s not in remap]
1✔
906
        self.add_states(new_states)
1✔
907
        for evt in state.events.values():
1✔
908
            self.events[evt.name] = evt
1✔
909
        if self.scoped.initial is None:
1✔
910
            self.scoped.initial = state.initial
1✔
911

912
    def _add_string_state(self, state, on_enter, on_exit, ignore_invalid_triggers, remap, **kwargs):
1✔
913
        if remap is not None and state in remap:
1✔
914
            return
1✔
915
        domains = state.split(self.state_cls.separator, 1)
1✔
916
        if len(domains) > 1:
1✔
917
            try:
1✔
918
                self.get_state(domains[0])
1✔
919
            except ValueError:
1✔
920
                self.add_state(domains[0], on_enter=on_enter, on_exit=on_exit,
1✔
921
                               ignore_invalid_triggers=ignore_invalid_triggers, **kwargs)
922
            with self(domains[0]):
1✔
923
                self.add_states(domains[1], on_enter=on_enter, on_exit=on_exit,
1✔
924
                                ignore_invalid_triggers=ignore_invalid_triggers, **kwargs)
925
        else:
926
            if state in self.states:
1✔
927
                raise ValueError("State {0} cannot be added since it already exists.".format(state))
1✔
928
            new_state = self._create_state(state, on_enter=on_enter, on_exit=on_exit,
1✔
929
                                           ignore_invalid_triggers=ignore_invalid_triggers, **kwargs)
930
            self.states[new_state.name] = new_state
1✔
931
            self._init_state(new_state)
1✔
932

933
    def _add_trigger_to_model(self, trigger, model):
1✔
934
        trig_func = partial(self.trigger_event, model, trigger)
1✔
935
        self._add_may_transition_func_for_trigger(trigger, model)
1✔
936
        # FunctionWrappers are only necessary if a custom separator is used
937
        if trigger.startswith('to_') and self.state_cls.separator != '_':
1✔
938
            path = trigger[3:].split(self.state_cls.separator)
1✔
939
            if hasattr(model, 'to_' + path[0]):
1✔
940
                # add path to existing function wrapper
941
                getattr(model, 'to_' + path[0]).add(trig_func, path[1:])
1✔
942
            else:
943
                # create a new function wrapper
944
                self._checked_assignment(model, 'to_' + path[0], FunctionWrapper(trig_func))
1✔
945
        else:
946
            self._checked_assignment(model, trigger, trig_func)
1✔
947

948
    def build_state_tree(self, model_states, separator, tree=None):
1✔
949
        """Converts a list of current states into a hierarchical state tree.
950
        Args:
951
            model_states (str or list(str)):
952
            separator (str): The character used to separate state names
953
            tree (OrderedDict): The current branch to use. If not passed, create a new tree.
954
        Returns:
955
            OrderedDict: A state tree dictionary
956
        """
957
        tree = tree if tree is not None else OrderedDict()
1✔
958
        if isinstance(model_states, list):
1✔
959
            for state in model_states:
1✔
960
                _ = self.build_state_tree(state, separator, tree)
1✔
961
        else:
962
            tmp = tree
1✔
963
            if isinstance(model_states, (Enum, EnumMeta)):
1✔
964
                with self():
1✔
965
                    path = self._get_enum_path(model_states)
1✔
966
            else:
967
                path = model_states.split(separator)
1✔
968
            for elem in path:
1✔
969
                tmp = tmp.setdefault(elem.name if hasattr(elem, 'name') else elem, OrderedDict())
1✔
970
        return tree
1✔
971

972
    def _get_enum_path(self, enum_state, prefix=None):
1✔
973
        prefix = prefix or []
1✔
974
        if enum_state.name in self.states and self.states[enum_state.name].value == enum_state:
1✔
975
            return prefix + [enum_state.name]
1✔
976
        for name in self.states:
1✔
977
            with self(name):
1✔
978
                res = self._get_enum_path(enum_state, prefix=prefix + [name])
1✔
979
                if res:
1✔
980
                    return res
1✔
981
        # if we reach this point without a prefix, we looped over all nested states
982
        # and could not find a suitable enum state
983
        if not prefix:
1✔
984
            raise ValueError("Could not find path of {0}.".format(enum_state))
1✔
985
        return None
1✔
986

987
    def _get_state_path(self, state, prefix=None):
1✔
988
        prefix = prefix or []
1✔
989
        if state in self.states.values():
1✔
990
            return prefix + [state.name]
1✔
991
        for name in self.states:
1✔
992
            with self(name):
1✔
993
                res = self._get_state_path(state, prefix=prefix + [name])
1✔
994
                if res:
1✔
995
                    return res
1✔
996
        return []
1✔
997

998
    def _check_event_result(self, res, model, trigger):
1✔
999
        if res is None:
1✔
1000
            state_names = getattr(model, self.model_attribute)
1✔
1001
            msg = "%sCan't trigger event '%s' from state(s) %s!" % (self.name, trigger, state_names)
1✔
1002
            for state_name in listify(state_names):
1✔
1003
                state = self.get_state(state_name)
1✔
1004
                ignore = state.ignore_invalid_triggers if state.ignore_invalid_triggers is not None \
1✔
1005
                    else self.ignore_invalid_triggers
1006
                if not ignore:
1✔
1007
                    # determine whether a MachineError (valid event but invalid state) ...
1008
                    if self.has_trigger(trigger):
1✔
1009
                        raise MachineError(msg)
1✔
1010
                    # or AttributeError (invalid event) is appropriate
1011
                    raise AttributeError("Do not know event named '%s'." % trigger)
1✔
1012
            _LOGGER.warning(msg)
1✔
1013
            res = False
1✔
1014
        return res
1✔
1015

1016
    def _get_trigger(self, model, trigger_name, *args, **kwargs):
1✔
1017
        """Convenience function added to the model to trigger events by name.
1018
        Args:
1019
            model (object): Model with assigned event trigger.
1020
            trigger_name (str): Name of the trigger to be called.
1021
            *args: Variable length argument list which is passed to the triggered event.
1022
            **kwargs: Arbitrary keyword arguments which is passed to the triggered event.
1023
        Returns:
1024
            bool: True if a transitions has been conducted or the trigger event has been queued.
1025
        """
1026
        return self.trigger_event(model, trigger_name, *args, **kwargs)
1✔
1027

1028
    def _has_state(self, state, raise_error=False):
1✔
1029
        """This function
1030
        Args:
1031
            state (NestedState): state to be tested
1032
            raise_error (bool): whether ValueError should be raised when the state
1033
                                is not registered
1034
       Returns:
1035
            bool: Whether state is registered in the machine
1036
        Raises:
1037
            ValueError: When raise_error is True and state is not registered
1038
        """
1039
        found = super(HierarchicalMachine, self)._has_state(state)
1✔
1040
        if not found:
1✔
1041
            for a_state in self.states:
1✔
1042
                with self(a_state):
1✔
1043
                    if self._has_state(state):
1✔
1044
                        return True
×
1045
        if not found and raise_error:
1✔
1046
            msg = 'State %s has not been added to the machine' % (state.name if hasattr(state, 'name') else state)
1✔
1047
            raise ValueError(msg)
1✔
1048
        return found
1✔
1049

1050
    def _init_state(self, state):
1✔
1051
        for model in self.models:
1✔
1052
            self._add_model_to_state(state, model)
1✔
1053
        if self.auto_transitions:
1✔
1054
            state_name = self.get_global_name(state.name)
1✔
1055
            parent = state_name.split(self.state_cls.separator, 1)
1✔
1056
            with self():
1✔
1057
                for a_state in self.get_nested_state_names():
1✔
1058
                    if a_state == parent[0]:
1✔
1059
                        self.add_transition('to_%s' % state_name, self.wildcard_all, state_name)
1✔
1060
                    elif len(parent) == 1:
1✔
1061
                        self.add_transition('to_%s' % a_state, state_name, a_state)
1✔
1062
        with self(state.name):
1✔
1063
            for substate in self.states.values():
1✔
1064
                self._init_state(substate)
1✔
1065

1066
    def _recursive_initial(self, value):
1✔
1067
        if isinstance(value, string_types):
1✔
1068
            path = value.split(self.state_cls.separator, 1)
1✔
1069
            if len(path) > 1:
1✔
1070
                state_name, suffix = path
1✔
1071
                # make sure the passed state has been created already
1072
                super(HierarchicalMachine, self.__class__).initial.fset(self, state_name)
1✔
1073
                with self(state_name):
1✔
1074
                    self.initial = suffix
1✔
1075
                    self._initial = state_name + self.state_cls.separator + self._initial
1✔
1076
            else:
1077
                super(HierarchicalMachine, self.__class__).initial.fset(self, value)
1✔
1078
        elif isinstance(value, (list, tuple)):
1✔
1079
            return [self._recursive_initial(v) for v in value]
1✔
1080
        else:
1081
            super(HierarchicalMachine, self.__class__).initial.fset(self, value)
1✔
1082
        return self._initial[0] if isinstance(self._initial, list) and len(self._initial) == 1 else self._initial
1✔
1083

1084
    def _remap_state(self, state, remap):
1✔
1085
        drop_event = []
1✔
1086
        remapped_transitions = []
1✔
1087
        for evt in self.events.values():
1✔
1088
            self.events[evt.name] = copy.copy(evt)
1✔
1089
        for trigger, event in self.events.items():
1✔
1090
            drop_source = []
1✔
1091
            event.transitions = copy.deepcopy(event.transitions)
1✔
1092
            for source_name, trans_source in event.transitions.items():
1✔
1093
                if source_name in remap:
1✔
1094
                    drop_source.append(source_name)
1✔
1095
                    continue
1✔
1096
                drop_trans = []
1✔
1097
                for trans in trans_source:
1✔
1098
                    if trans.dest in remap:
1✔
1099
                        conditions, unless = [], []
1✔
1100
                        for cond in trans.conditions:
1✔
1101
                            # split a list in two lists based on the accessors (cond.target) truth value
1102
                            (unless, conditions)[cond.target].append(cond.func)
1✔
1103
                        remapped_transitions.append({
1✔
1104
                            'trigger': trigger,
1105
                            'source': state.name + self.state_cls.separator + trans.source,
1106
                            'dest': remap[trans.dest],
1107
                            'conditions': conditions,
1108
                            'unless': unless,
1109
                            'prepare': trans.prepare,
1110
                            'before': trans.before,
1111
                            'after': trans.after})
1112
                        drop_trans.append(trans)
1✔
1113
                for d_trans in drop_trans:
1✔
1114
                    trans_source.remove(d_trans)
1✔
1115
                if not trans_source:
1✔
1116
                    drop_source.append(source_name)
1✔
1117
            for d_source in drop_source:
1✔
1118
                del event.transitions[d_source]
1✔
1119
            if not event.transitions:
1✔
1120
                drop_event.append(trigger)
1✔
1121
        for d_event in drop_event:
1✔
1122
            del self.events[d_event]
1✔
1123
        return remapped_transitions
1✔
1124

1125
    def _resolve_initial(self, models, state_name_path, prefix=None):
1✔
1126
        prefix = prefix or []
1✔
1127
        if state_name_path:
1✔
1128
            state_name = state_name_path.pop(0)
1✔
1129
            with self(state_name):
1✔
1130
                return self._resolve_initial(models, state_name_path, prefix=prefix + [state_name])
1✔
1131
        if self.scoped.initial:
1✔
1132
            entered_states = []
1✔
1133
            for initial_state_name in listify(self.scoped.initial):
1✔
1134
                with self(initial_state_name):
1✔
1135
                    entered_states.append(self._resolve_initial(models, [], prefix=prefix + [self.scoped.name]))
1✔
1136
            return entered_states if len(entered_states) > 1 else entered_states[0]
1✔
1137
        return self.state_cls.separator.join(prefix)
1✔
1138

1139
    def _set_state(self, state_name):
1✔
1140
        if isinstance(state_name, list):
1✔
1141
            return [self._set_state(value) for value in state_name]
1✔
1142
        a_state = self.get_state(state_name)
1✔
1143
        return a_state.value if isinstance(a_state.value, Enum) else state_name
1✔
1144

1145
    def _trigger_event_nested(self, event_data, trigger, _state_tree):
1✔
1146
        model = event_data.model
1✔
1147
        if _state_tree is None:
1✔
1148
            _state_tree = self.build_state_tree(listify(getattr(model, self.model_attribute)),
1✔
1149
                                                self.state_cls.separator)
1150
        res = {}
1✔
1151
        for key, value in _state_tree.items():
1✔
1152
            if value:
1✔
1153
                with self(key):
1✔
1154
                    tmp = self._trigger_event_nested(event_data, trigger, value)
1✔
1155
                    if tmp is not None:
1✔
1156
                        res[key] = tmp
1✔
1157
            if res.get(key, False) is False and trigger in self.events:
1✔
1158
                event_data.event = self.events[trigger]
1✔
1159
                tmp = event_data.event.trigger_nested(event_data)
1✔
1160
                if tmp is not None:
1✔
1161
                    res[key] = tmp
1✔
1162
        return None if not res or all(v is None for v in res.values()) else any(res.values())
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc