• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pytransitions / transitions / 8938991339

03 May 2024 12:30PM UTC coverage: 98.432% (+0.2%) from 98.217%
8938991339

push

github

aleneum
use coverage only for mypy job and update setup.py tags

5149 of 5231 relevant lines covered (98.43%)

0.98 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

99.47
/transitions/extensions/diagrams_pygraphviz.py
1
"""
1✔
2
    transitions.extensions.diagrams
3
    -------------------------------
4

5
    Graphviz support for (nested) machines. This also includes partial views
6
    of currently valid transitions.
7
"""
8

9
import logging
1✔
10

11
try:
1✔
12
    import pygraphviz as pgv
1✔
13
except ImportError:
1✔
14
    pgv = None
1✔
15

16
from .nesting import NestedState
1✔
17
from .diagrams_base import BaseGraph
1✔
18

19
_LOGGER = logging.getLogger(__name__)
1✔
20
_LOGGER.addHandler(logging.NullHandler())
1✔
21

22

23
class Graph(BaseGraph):
1✔
24
    """Graph creation for transitions.core.Machine."""
1✔
25

26
    def _add_nodes(self, states, container):
1✔
27
        for state in states:
1✔
28
            shape = self.machine.style_attributes['node']['default']['shape']
1✔
29
            container.add_node(state['name'], label=self._convert_state_attributes(state), shape=shape)
1✔
30

31
    def _add_edges(self, transitions, container):
1✔
32
        for transition in transitions:
1✔
33
            src = transition['source']
1✔
34
            edge_attr = {'label': self._transition_label(transition)}
1✔
35
            try:
1✔
36
                dst = transition['dest']
1✔
37
            except KeyError:
1✔
38
                dst = src
1✔
39
            if container.has_edge(src, dst):
1✔
40
                edge = container.get_edge(src, dst)
1✔
41
                edge.attr['label'] = edge.attr['label'] + ' | ' + edge_attr['label']
1✔
42
            else:
43
                container.add_edge(src, dst, **edge_attr)
1✔
44

45
    def generate(self):
1✔
46

47
        self.fsm_graph = pgv.AGraph(**self.machine.machine_attributes)
1✔
48
        self.fsm_graph.node_attr.update(self.machine.style_attributes['node']['default'])
1✔
49
        self.fsm_graph.edge_attr.update(self.machine.style_attributes['edge']['default'])
1✔
50
        states, transitions = self._get_elements()
1✔
51
        self._add_nodes(states, self.fsm_graph)
1✔
52
        self._add_edges(transitions, self.fsm_graph)
1✔
53
        setattr(self.fsm_graph, 'style_attributes', self.machine.style_attributes)
1✔
54

55
    def get_graph(self, title=None, roi_state=None):
1✔
56
        if title:
1✔
57
            self.fsm_graph.graph_attr['label'] = title
1✔
58
        if roi_state:
1✔
59
            filtered = _copy_agraph(self.fsm_graph)
1✔
60
            kept_nodes = set()
1✔
61
            kept_edges = set()
1✔
62
            sep = getattr(self.machine.state_cls, "separator", None)
1✔
63
            for state in self._flatten(roi_state):
1✔
64
                kept_nodes.add(state)
1✔
65
                if sep:
1✔
66
                    state = sep.join(state.split(sep)[:-1])
1✔
67
                    while state:
1✔
68
                        kept_nodes.add(state)
1✔
69
                        state = sep.join(state.split(sep)[:-1])
1✔
70

71
            # remove all edges that have no connection to the currently active state
72
            for state in list(kept_nodes):
1✔
73
                for edge in filtered.out_edges_iter(state):
1✔
74
                    kept_nodes.add(edge[1])
1✔
75
                    kept_edges.add(edge)
1✔
76

77
                for edge in filtered.in_edges(state):
1✔
78
                    if edge.attr['color'] == self.fsm_graph.style_attributes['edge']['previous']['color']:
1✔
79
                        kept_nodes.add(edge[0])
1✔
80
                        kept_edges.add(edge)
1✔
81

82
            for node in filtered.nodes():
1✔
83
                if node not in kept_nodes:
1✔
84
                    filtered.delete_node(node)
1✔
85

86
            for edge in filtered.edges():
1✔
87
                if edge not in kept_edges:
1✔
88
                    filtered.delete_edge(edge)
1✔
89

90
            return filtered
1✔
91
        return self.fsm_graph
1✔
92

93
    def set_node_style(self, state, style):
1✔
94
        node = self.fsm_graph.get_node(state.name if hasattr(state, "name") else state)
1✔
95
        style_attr = self.fsm_graph.style_attributes.get('node', {}).get(style)
1✔
96
        node.attr.update(style_attr)
1✔
97

98
    def set_previous_transition(self, src, dst):
1✔
99
        try:
1✔
100
            edge = self.fsm_graph.get_edge(src, dst)
1✔
101
        except KeyError:
1✔
102
            self.fsm_graph.add_edge(src, dst)
1✔
103
            edge = self.fsm_graph.get_edge(src, dst)
1✔
104
        style_attr = self.fsm_graph.style_attributes.get('edge', {}).get('previous')
1✔
105
        edge.attr.update(style_attr)
1✔
106
        self.set_node_style(src, 'previous')
1✔
107
        self.set_node_style(dst, 'active')
1✔
108

109
    def reset_styling(self):
1✔
110
        for edge in self.fsm_graph.edges_iter():
1✔
111
            style_attr = self.fsm_graph.style_attributes.get('edge', {}).get('default')
1✔
112
            edge.attr.update(style_attr)
1✔
113
        for node in self.fsm_graph.nodes_iter():
1✔
114
            if 'point' not in node.attr['shape']:
1✔
115
                style_attr = self.fsm_graph.style_attributes.get('node', {}).get('inactive')
1✔
116
                node.attr.update(style_attr)
1✔
117
        for sub_graph in self.fsm_graph.subgraphs_iter():
1✔
118
            style_attr = self.fsm_graph.style_attributes.get('graph', {}).get('default')
1✔
119
            sub_graph.graph_attr.update(style_attr)
1✔
120

121

122
class NestedGraph(Graph):
1✔
123
    """Graph creation support for transitions.extensions.nested.HierarchicalGraphMachine."""
1✔
124

125
    def __init__(self, *args, **kwargs):
1✔
126
        self.seen_transitions = []
1✔
127
        super(NestedGraph, self).__init__(*args, **kwargs)
1✔
128

129
    def _add_nodes(self, states, container, prefix='', default_style='default'):
1✔
130
        for state in states:
1✔
131
            name = prefix + state['name']
1✔
132
            label = self._convert_state_attributes(state)
1✔
133

134
            if 'children' in state:
1✔
135
                cluster_name = "cluster_" + name
1✔
136
                is_parallel = isinstance(state.get('initial', ''), list)
1✔
137
                sub = container.add_subgraph(name=cluster_name, label=label, rank='source',
1✔
138
                                             **self.machine.style_attributes['graph'][default_style])
139
                root_container = sub.add_subgraph(name=cluster_name + '_root', label='', color=None, rank='min')
1✔
140
                width = '0' if is_parallel else '0.1'
1✔
141
                root_container.add_node(name, shape='point', fillcolor='black', width=width)
1✔
142
                self._add_nodes(state['children'], sub, prefix=prefix + state['name'] + NestedState.separator,
1✔
143
                                default_style='parallel' if is_parallel else 'default')
144
            else:
145
                container.add_node(name, label=label, **self.machine.style_attributes['node'][default_style])
1✔
146

147
    def _add_edges(self, transitions, container):
1✔
148

149
        for transition in transitions:
1✔
150
            # enable customizable labels
151
            label_pos = 'label'
1✔
152
            src = transition['source']
1✔
153
            try:
1✔
154
                dst = transition['dest']
1✔
155
            except KeyError:
1✔
156
                dst = src
1✔
157
            edge_attr = {}
1✔
158
            if _get_subgraph(container, 'cluster_' + src) is not None:
1✔
159
                edge_attr['ltail'] = 'cluster_' + src
1✔
160
                # edge_attr['minlen'] = "3"
161
                label_pos = 'headlabel'
1✔
162
            src_name = src
1✔
163

164
            dst_graph = _get_subgraph(container, 'cluster_' + dst)
1✔
165
            if dst_graph is not None:
1✔
166
                if not src.startswith(dst):
1✔
167
                    edge_attr['lhead'] = "cluster_" + dst
1✔
168
                    label_pos = 'taillabel' if label_pos.startswith('l') else 'label'
1✔
169
            dst_name = dst
1✔
170

171
            # remove ltail when dst is a child of src
172
            if 'ltail' in edge_attr:
1✔
173
                if _get_subgraph(container, edge_attr['ltail']).has_node(dst_name):
1✔
174
                    del edge_attr['ltail']
1✔
175

176
            edge_attr[label_pos] = self._transition_label(transition)
1✔
177
            if container.has_edge(src_name, dst_name):
1✔
178
                edge = container.get_edge(src_name, dst_name)
1✔
179
                edge.attr[label_pos] += ' | ' + edge_attr[label_pos]
1✔
180
            else:
181
                container.add_edge(src_name, dst_name, **edge_attr)
1✔
182

183
    def set_node_style(self, state, style):
1✔
184
        for state_name in self._get_state_names(state):
1✔
185
            self._set_node_style(state_name, style)
1✔
186

187
    def _set_node_style(self, state, style):
1✔
188
        try:
1✔
189
            node = self.fsm_graph.get_node(state)
1✔
190
            style_attr = self.fsm_graph.style_attributes.get('node', {}).get(style)
1✔
191
            node.attr.update(style_attr)
1✔
192
        except KeyError:
1✔
193
            subgraph = _get_subgraph(self.fsm_graph, state)
1✔
194
            style_attr = self.fsm_graph.style_attributes.get('graph', {}).get(style)
1✔
195
            subgraph.graph_attr.update(style_attr)
1✔
196

197
    def set_previous_transition(self, src, dst):
1✔
198
        src = self._get_global_name(src.split(self.machine.state_cls.separator))
1✔
199
        dst = self._get_global_name(dst.split(self.machine.state_cls.separator))
1✔
200
        edge_attr = self.fsm_graph.style_attributes.get('edge', {}).get('previous').copy()
1✔
201
        try:
1✔
202
            edge = self.fsm_graph.get_edge(src, dst)
1✔
203
        except KeyError:
1✔
204
            _src = src
1✔
205
            _dst = dst
1✔
206
            if _get_subgraph(self.fsm_graph, 'cluster_' + src):
1✔
207
                edge_attr['ltail'] = 'cluster_' + src
1✔
208
            if _get_subgraph(self.fsm_graph, 'cluster_' + dst):
1✔
209
                edge_attr['lhead'] = "cluster_" + dst
1✔
210
            try:
1✔
211
                edge = self.fsm_graph.get_edge(_src, _dst)
1✔
212
            except KeyError:
1✔
213
                self.fsm_graph.add_edge(_src, _dst)
1✔
214
                edge = self.fsm_graph.get_edge(_src, _dst)
1✔
215

216
        edge.attr.update(edge_attr)
1✔
217
        self.set_node_style(edge.attr.get("ltail") or src, 'previous')
1✔
218

219

220
def _get_subgraph(graph, name):
1✔
221
    """Searches for subgraphs in a graph.
222
    Args:
223
        g (AGraph): Container to be searched.
224
        name (str): Name of the cluster.
225
    Returns: AGraph if a cluster called 'name' exists else None
226
    """
227
    sub_graph = graph.get_subgraph(name)
1✔
228
    if sub_graph:
1✔
229
        return sub_graph
1✔
230
    for sub in graph.subgraphs_iter():
1✔
231
        sub_graph = _get_subgraph(sub, name)
1✔
232
        if sub_graph:
1✔
233
            return sub_graph
1✔
234
    return None
1✔
235

236

237
# the official copy method does not close the file handle
238
# which causes ResourceWarnings
239
def _copy_agraph(graph):
1✔
240
    from tempfile import TemporaryFile  # pylint: disable=import-outside-toplevel; Only required for special cases
1✔
241

242
    with TemporaryFile() as tmp:
1✔
243
        if hasattr(tmp, "file"):
1✔
244
            fhandle = tmp.file
×
245
        else:
246
            fhandle = tmp
1✔
247
        graph.write(fhandle)
1✔
248
        tmp.seek(0)
1✔
249
        res = graph.__class__(filename=fhandle)
1✔
250
        fhandle.close()
1✔
251
    return res
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc