• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pytransitions / transitions / 8938991339

03 May 2024 12:30PM UTC coverage: 98.432% (+0.2%) from 98.217%
8938991339

push

github

aleneum
use coverage only for mypy job and update setup.py tags

5149 of 5231 relevant lines covered (98.43%)

0.98 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

99.35
/transitions/extensions/markup.py
1
"""
1✔
2
    transitions.extensions.markup
3
    -----------------------------
4

5
    This module extends machines with markup functionality that can be used to retrieve the current machine
6
    configuration as a dictionary. This is used as the foundation for diagram generation with Graphviz but can
7
    also be used to store and transfer machines.
8
"""
9

10
from functools import partial
1✔
11
import importlib
1✔
12
import itertools
1✔
13
import numbers
1✔
14

15
from six import string_types, iteritems
1✔
16

17
try:
1✔
18
    # Enums are supported for Python 3.4+ and Python 2.7 with enum34 package installed
19
    from enum import Enum, EnumMeta
1✔
20
except ImportError:  # pragma: no cover
21
    # If enum is not available, create dummy classes for type checks
22
    # typing must be prevent redefinition issues with mypy
23
    class Enum:  # type:ignore
24
        """This is just an Enum stub for Python 2 and Python 3.3 and before without Enum support."""
25

26
    class EnumMeta:  # type:ignore
27
        """This is just an EnumMeta stub for Python 2 and Python 3.3 and before without Enum support."""
28

29
from ..core import Machine
1✔
30
from .nesting import HierarchicalMachine
1✔
31

32

33
class MarkupMachine(Machine):
1✔
34
    """Extends transitions.core.Machine with the capability to generate a dictionary representation of itself,
1✔
35
    its events, states and models.
36
    """
37

38
    # Special attributes such as NestedState._name/_parent or Transition._condition are handled differently
39
    state_attributes = ['on_exit', 'on_enter', 'ignore_invalid_triggers', 'timeout', 'on_timeout', 'tags', 'label']
1✔
40
    transition_attributes = ['source', 'dest', 'prepare', 'before', 'after', 'label']
1✔
41

42
    def __init__(self, model=Machine.self_literal, states=None, initial='initial', transitions=None,
1✔
43
                 send_event=False, auto_transitions=True,
44
                 ordered_transitions=False, ignore_invalid_triggers=None,
45
                 before_state_change=None, after_state_change=None, name=None,
46
                 queued=False, prepare_event=None, finalize_event=None, model_attribute='state', on_exception=None,
47
                 markup=None, auto_transitions_markup=False, **kwargs):
48
        self._markup = markup or {}
1✔
49
        self._auto_transitions_markup = auto_transitions_markup
1✔
50
        self._needs_update = True
1✔
51

52
        if self._markup:
1✔
53
            # remove models from config to process them AFTER the base machine has been initialized
54
            models = self._markup.pop('models', [])
1✔
55
            super(MarkupMachine, self).__init__(model=None, **self._markup)
1✔
56
            for mod in models:
1✔
57
                self._add_markup_model(mod)
1✔
58
        else:
59
            super(MarkupMachine, self).__init__(
1✔
60
                model=model, states=states, initial=initial, transitions=transitions,
61
                send_event=send_event, auto_transitions=auto_transitions,
62
                ordered_transitions=ordered_transitions, ignore_invalid_triggers=ignore_invalid_triggers,
63
                before_state_change=before_state_change, after_state_change=after_state_change, name=name,
64
                queued=queued, prepare_event=prepare_event, finalize_event=finalize_event,
65
                model_attribute=model_attribute, on_exception=on_exception, **kwargs
66
            )
67
            self._markup['before_state_change'] = [x for x in (rep(f) for f in self.before_state_change) if x]
1✔
68
            self._markup['after_state_change'] = [x for x in (rep(f) for f in self.before_state_change) if x]
1✔
69
            self._markup['prepare_event'] = [x for x in (rep(f) for f in self.prepare_event) if x]
1✔
70
            self._markup['finalize_event'] = [x for x in (rep(f) for f in self.finalize_event) if x]
1✔
71
            self._markup['send_event'] = self.send_event
1✔
72
            self._markup['auto_transitions'] = self.auto_transitions
1✔
73
            self._markup['ignore_invalid_triggers'] = self.ignore_invalid_triggers
1✔
74
            self._markup['queued'] = self.has_queue
1✔
75

76
    @property
1✔
77
    def auto_transitions_markup(self):
1✔
78
        """Whether auto transitions should be included in the markup."""
79
        return self._auto_transitions_markup
1✔
80

81
    @auto_transitions_markup.setter
1✔
82
    def auto_transitions_markup(self, value):
1✔
83
        """Whether auto transitions should be included in the markup."""
84
        self._auto_transitions_markup = value
1✔
85
        self._needs_update = True
1✔
86

87
    @property
1✔
88
    def markup(self):
1✔
89
        """Returns the machine's configuration as a markup dictionary.
90
        Returns:
91
            dict of machine configuration parameters.
92
        """
93
        self._markup['models'] = self._convert_models()
1✔
94
        return self.get_markup_config()
1✔
95

96
    # the only reason why this not part of markup property is that pickle
97
    # has issues with properties during __setattr__ (self.markup is not set)
98
    def get_markup_config(self):
1✔
99
        """Generates and returns all machine markup parameters except models.
100
        Returns:
101
            dict of machine configuration parameters.
102
        """
103
        if self._needs_update:
1✔
104
            self._convert_states_and_transitions(self._markup)
1✔
105
            self._needs_update = False
1✔
106
        return self._markup
1✔
107

108
    def add_transition(self, trigger, source, dest, conditions=None,
1✔
109
                       unless=None, before=None, after=None, prepare=None, **kwargs):
110
        super(MarkupMachine, self).add_transition(trigger, source, dest, conditions=conditions, unless=unless,
1✔
111
                                                  before=before, after=after, prepare=prepare, **kwargs)
112
        self._needs_update = True
1✔
113

114
    def remove_transition(self, trigger, source="*", dest="*"):
1✔
115
        super(MarkupMachine, self).remove_transition(trigger, source, dest)
1✔
116
        self._needs_update = True
1✔
117

118
    def add_states(self, states, on_enter=None, on_exit=None, ignore_invalid_triggers=None, **kwargs):
1✔
119
        super(MarkupMachine, self).add_states(states, on_enter=on_enter, on_exit=on_exit,
1✔
120
                                              ignore_invalid_triggers=ignore_invalid_triggers, **kwargs)
121
        self._needs_update = True
1✔
122

123
    @staticmethod
1✔
124
    def format_references(func):
1✔
125
        """Creates a string representation of referenced callbacks.
126
        Returns:
127
            str that represents a callback reference.
128
        """
129
        try:
1✔
130
            return func.__name__
1✔
131
        except AttributeError:
1✔
132
            pass
1✔
133
        if isinstance(func, partial):
1✔
134
            return "%s(%s)" % (
1✔
135
                func.func.__name__,
136
                ", ".join(itertools.chain(
137
                    (str(_) for _ in func.args),
138
                    ("%s=%s" % (key, value)
139
                     for key, value in iteritems(func.keywords if func.keywords else {})))))
140
        return str(func)
1✔
141

142
    def _convert_states_and_transitions(self, root):
1✔
143
        state = getattr(self, 'scoped', self)
1✔
144
        if state.initial:
1✔
145
            root['initial'] = state.initial
1✔
146
        if state == self and state.name:
1✔
147
            root['name'] = self.name[:-2]
1✔
148
        self._convert_transitions(root)
1✔
149
        self._convert_states(root)
1✔
150

151
    def _convert_states(self, root):
1✔
152
        key = 'states' if getattr(self, 'scoped', self) == self else 'children'
1✔
153
        root[key] = []
1✔
154
        for state_name, state in self.states.items():
1✔
155
            s_def = _convert(state, self.state_attributes, self.format_references)
1✔
156
            if isinstance(state_name, Enum):
1✔
157
                s_def['name'] = state_name.name
×
158
            else:
159
                s_def['name'] = state_name
1✔
160
            if getattr(state, 'states', []):
1✔
161
                with self(state_name):
1✔
162
                    self._convert_states_and_transitions(s_def)
1✔
163
            root[key].append(s_def)
1✔
164

165
    def _convert_transitions(self, root):
1✔
166
        root['transitions'] = []
1✔
167
        for event in self.events.values():
1✔
168
            if self._omit_auto_transitions(event):
1✔
169
                continue
1✔
170

171
            for transitions in event.transitions.items():
1✔
172
                for trans in transitions[1]:
1✔
173
                    t_def = _convert(trans, self.transition_attributes, self.format_references)
1✔
174
                    t_def['trigger'] = event.name
1✔
175
                    con = [x for x in (rep(f.func, self.format_references) for f in trans.conditions
1✔
176
                                       if f.target) if x]
177
                    unl = [x for x in (rep(f.func, self.format_references) for f in trans.conditions
1✔
178
                                       if not f.target) if x]
179
                    if con:
1✔
180
                        t_def['conditions'] = con
1✔
181
                    if unl:
1✔
182
                        t_def['unless'] = unl
1✔
183
                    root['transitions'].append(t_def)
1✔
184

185
    def _add_markup_model(self, markup):
1✔
186
        initial = markup.get('state', None)
1✔
187
        if markup['class-name'] == 'self':
1✔
188
            self.add_model(self, initial)
1✔
189
        else:
190
            mod_name, cls_name = markup['class-name'].rsplit('.', 1)
1✔
191
            cls = getattr(importlib.import_module(mod_name), cls_name)
1✔
192
            self.add_model(cls(), initial)
1✔
193

194
    def _convert_models(self):
1✔
195
        models = []
1✔
196
        for model in self.models:
1✔
197
            state = getattr(model, self.model_attribute)
1✔
198
            model_def = dict(state=state.name if isinstance(state, Enum) else state)
1✔
199
            model_def['name'] = model.name if hasattr(model, 'name') else str(id(model))
1✔
200
            model_def['class-name'] = 'self' if model == self else model.__module__ + "." + model.__class__.__name__
1✔
201
            models.append(model_def)
1✔
202
        return models
1✔
203

204
    def _omit_auto_transitions(self, event):
1✔
205
        return self.auto_transitions_markup is False and self._is_auto_transition(event)
1✔
206

207
    # auto transition events commonly a) start with the 'to_' prefix, followed by b) the state name
208
    # and c) contain a transition from each state to the target state (including the target)
209
    def _is_auto_transition(self, event):
1✔
210
        if event.name.startswith('to_') and len(event.transitions) == len(self.states):
1✔
211
            state_name = event.name[len('to_'):]
1✔
212
            try:
1✔
213
                _ = self.get_state(state_name)
1✔
214
                return True
1✔
215
            except ValueError:
1✔
216
                pass
1✔
217
        return False
1✔
218

219
    def _identify_callback(self, name):
1✔
220
        callback_type, target = super(MarkupMachine, self)._identify_callback(name)
1✔
221
        if callback_type:
1✔
222
            self._needs_update = True
1✔
223
        return callback_type, target
1✔
224

225

226
class HierarchicalMarkupMachine(MarkupMachine, HierarchicalMachine):
1✔
227
    """Extends transitions.extensions.nesting.HierarchicalMachine with markup capabilities."""
1✔
228

229

230
def rep(func, format_references=None):
1✔
231
    """Return a string representation for `func`."""
232
    if isinstance(func, string_types):
1✔
233
        return func
1✔
234
    if isinstance(func, numbers.Number):
1✔
235
        return str(func)
1✔
236
    return format_references(func) if format_references is not None else None
1✔
237

238

239
def _convert(obj, attributes, format_references):
1✔
240
    definition = {}
1✔
241
    for key in attributes:
1✔
242
        val = getattr(obj, key, False)
1✔
243
        if not val:
1✔
244
            continue
1✔
245
        if isinstance(val, string_types):
1✔
246
            definition[key] = val
1✔
247
        else:
248
            try:
1✔
249
                definition[key] = [rep(v, format_references) for v in iter(val)]
1✔
250
            except TypeError:
1✔
251
                definition[key] = rep(val, format_references)
1✔
252
    return definition
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc