• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

atlanticwave-sdx / pce / 11126701726

01 Oct 2024 01:46PM CUT coverage: 88.705% (+0.05%) from 88.658%
11126701726

push

github

web-flow
Merge pull request #228 from atlanticwave-sdx/227-creating-a-connection-to-a-port-directly-attached-to-a-neighbor-domain-results-in-wrong-breakdown

227 creating a connection to a port directly attached to a neighbor domain results in wrong breakdown

583 of 679 branches covered (85.86%)

Branch coverage included in aggregate %.

4 of 4 new or added lines in 1 file covered. (100.0%)

1129 of 1251 relevant lines covered (90.25%)

3.6 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

70.97
/src/sdx_pce/utils/functions.py
1
# -*- coding: utf-8 -*-
2
"""
4✔
3
Created on Wed Aug 11 16:40:56 2021
4

5
@author: Yufeng Xin (yxin@renci.org)
6
"""
7

8
import copy
4✔
9
import random
4✔
10

11
from networkx.algorithms import approximation as approx
4✔
12

13
from sdx_pce.utils.constants import Constants
4✔
14

15

16
class GraphFunction:
4✔
17
    def __init__(self, g=None) -> None:
4✔
18
        self.graph = g
4✔
19

20
    def set_graph(self, g=None):
4✔
21
        self.graph = g
4✔
22

23
    def weight_assign(self, cost_flag=Constants.COST_FLAG_HOP, cost=None):
4✔
24
        """
25
        Set weight (cost) per link.
26

27
        The assumption is that the objective is minizing a function of
28
        weight.
29

30
        :param cost_flag: a constant defined in constants.py, whose
31
            possible values are:
32

33
                - COST_FLAG_BW : weight = alpha*(1.0/bw)
34

35
                - COST_FLAG_LATENCY: weight = latency
36

37
                - COST_FLAG_RANDOM: weight = random cost
38

39
                - COST_FLAG_STATIC: given from outside (static)
40
                  definition
41

42
                - COST_FLAG_HOP (the defaul): hop: weight = 1
43
        """
44
        distance_list = []
4✔
45

46
        if cost_flag == Constants.COST_FLAG_BW:
4✔
47
            for u, v, w in self.graph.edges(data=True):
×
48
                # w[Constants.WEIGHT] = Constants.Max_L_BW - w[Constants.BANDWIDTH]
49
                w[Constants.WEIGHT] = Constants.ALPHA * (1.0 / w[Constants.BANDWIDTH])
×
50
                distance_list.append(w[Constants.WEIGHT])
×
51
        elif cost_flag == Constants.COST_FLAG_LATENCY:
4✔
52
            for u, v, w in self.graph.edges(data=True):
×
53
                w[Constants.WEIGHT] = w[Constants.LATENCY]
×
54
                distance_list.append(w[Constants.WEIGHT])
×
55
        elif cost_flag == Constants.COST_FLAG_RANDOM:
4✔
56
            for u, v, w in self.graph.edges(data=True):
×
57
                w[Constants.WEIGHT] = random.randint(1, 2**24)
×
58
                distance_list.append(w[Constants.WEIGHT])
×
59
        elif cost_flag == Constants.COST_FLAG_STATIC:
4✔
60
            for u, v, w in self.graph.edges(data=True):
×
61
                w[Constants.WEIGHT] = cost[u, v]
×
62
                distance_list.append(w[Constants.WEIGHT])
×
63
        else:
64
            for u, v, w in self.graph.edges(data=True):
4✔
65
                w[Constants.WEIGHT] = 1.0
4✔
66
                distance_list.append(w[Constants.WEIGHT])
4✔
67
        self.distance_list = distance_list
4!
68
        return distance_list
4✔
69

70
    # if u and v connected
71
    def nodes_connected(self, g, u, v):
4✔
72
        return u in g.neighbors(v)
×
73

74
    def get_connectivity(self):
4✔
75
        con = approx.node_connectivity(self.graph)
×
76
        return con
×
77

78
    def biconnectivity(self):
4✔
79
        pass
×
80

81

82
def dijnew(graph, start_node, end_node):
4✔
83
    """use dijsktra to get the primary shortest path"""
84
    graph_new = graph_simplify(graph)
4✔
85
    shortest_distance = {}
4✔
86
    predecessor = {}
4✔
87
    unseen_nodes = graph_new
4✔
88
    infinity = 9999999
4✔
89
    path = []
4✔
90
    for node in unseen_nodes:
4✔
91
        shortest_distance[node] = infinity
4✔
92
    shortest_distance[start_node] = 0
4✔
93

94
    while unseen_nodes:  # loop all the nodes in the list
4✔
95
        min_node = None
4✔
96
        for node in unseen_nodes:
4✔
97
            if min_node is None:
4✔
98
                min_node = node
4✔
99
            elif shortest_distance[node] < shortest_distance[min_node]:
4✔
100
                min_node = node  # find the current node
4✔
101

102
        for child_node, weight in graph[min_node].items():
4✔
103
            if weight + shortest_distance[min_node] < shortest_distance[child_node]:
4✔
104
                shortest_distance[child_node] = weight + shortest_distance[min_node]
4✔
105
                predecessor[child_node] = min_node
4✔
106
        unseen_nodes.pop(min_node)
4✔
107

108
    current_node = end_node  # run path backwards to get the real path
4✔
109
    while current_node != start_node:
4✔
110
        try:
4✔
111
            path.insert(0, current_node)
4✔
112
            current_node = predecessor[current_node]
4✔
113
        except KeyError:
4✔
114
            print("Path not reachable")
4✔
115
            return []
4✔
116
    path.insert(0, start_node)
4✔
117
    if (
4✔
118
        shortest_distance[end_node] != infinity
119
    ):  # check if the end_node has been reached
120
        print("Shortest distance is " + str(shortest_distance[end_node]))
4✔
121
        print("And the path is " + str(path))
4✔
122
    return path
4!
123

124

125
def graph_simplify(graph):
4✔
126
    """make the non-simple graph to be the simple graph"""
127
    for node in graph:
4✔
128
        for endpoint in graph[node]:
4!
129
            try:
4✔
130
                if len(graph[node][endpoint]) > 1:
4✔
131
                    shortest = min(graph[node][endpoint])
×
132
                    graph[node][endpoint] = shortest
×
133
            except TypeError:
4✔
134
                pass
4✔
135
    return graph
4✔
136

137

138
def backup_path(graph, start_node, end_node):
4✔
139
    """remove the primary shortest path and redo the dijsktra to get the backup path"""
140
    backupstart_node = start_node
4✔
141
    backupend_node = end_node
4✔
142
    graphprimary = copy.deepcopy(graph)
4✔
143
    graph_original = copy.deepcopy(graph)
4✔
144
    graph_new = graph_simplify(graph)
4✔
145
    print("The primary path: ")
4✔
146
    path = dijnew(graphprimary, start_node, end_node)
4✔
147
    path_new = path.copy()
4✔
148
    path_new.pop()
4✔
149

150
    for (
4✔
151
        ele
152
    ) in path_new:  # update the graph, delete the path that was used in primary path
153
        index = path.index(ele)
4✔
154
        try:
4✔
155
            graph_original[ele][path[index + 1]].remove(graph_new[ele][path[index + 1]])
4✔
156
        except AttributeError:
4✔
157
            del graph_original[ele][path[index + 1]]
4✔
158

159
    for start_node in graph_original:  # reformat the updated graph list
4✔
160
        for end_node in graph_original[start_node]:
4!
161
            try:
4✔
162
                if len(graph_original[start_node][end_node]) == 1:
4✔
163
                    graph_original[start_node][end_node] = graph_original[start_node][
×
164
                        end_node
165
                    ][0]
166
            except TypeError:
4✔
167
                continue
4✔
168

169
    print("The back up path: ")
4✔
170
    return dijnew(graph_original, backupstart_node, backupend_node)
4✔
171

172

173
def create_unvisited_list(link_list):
4✔
174
    unvisited_list = []
×
175
    for keys in link_list:
×
176
        unvisited_list.append(keys)
×
177
    return unvisited_list
×
178

179

180
def create_unvisited_node(my_list):
4✔
181
    unvisited_node = {}
×
182
    for keys in my_list:
×
183
        unvisited_node[keys] = 0
×
184
    return unvisited_node
×
185

186

187
def get_graph_info(link_list):
4✔
188
    key_list = []
×
189
    for keys in link_list:
×
190
        key_list.append()
×
191
    print(key_list)
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc