• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

atlanticwave-sdx / pce / 7880614954

13 Feb 2024 01:51AM UTC coverage: 80.0%. Remained the same
7880614954

push

github

web-flow
Merge pull request #174 from atlanticwave-sdx/add-iso3166_2_lvl4

Add iso3166_2_lvl4

308 of 405 branches covered (76.05%)

Branch coverage included in aggregate %.

896 of 1100 relevant lines covered (81.45%)

3.25 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

19.89
/src/sdx_pce/utils/functions.py
1
# -*- coding: utf-8 -*-
2
"""
4✔
3
Created on Wed Aug 11 16:40:56 2021
4

5
@author: Yufeng Xin (yxin@renci.org)
6
"""
7

8
import copy
4✔
9
import random
4✔
10

11
from networkx.algorithms import approximation as approx
4✔
12

13
from sdx_pce.utils.constants import Constants
4✔
14

15

16
class GraphFunction:
4✔
17
    def __init__(self, g=None) -> None:
4✔
18
        self.graph = g
4✔
19

20
    def set_graph(self, g=None):
4✔
21
        self.graph = g
4✔
22

23
    def weight_assign(self, cost_flag=Constants.COST_FLAG_HOP, cost=None):
4✔
24
        """
25
        Set weight (cost) per link.
26

27
        The assumption is that the objective is minizing a function of
28
        weight.
29

30
        :param cost_flag: a constant defined in constants.py, whose
31
            possible values are:
32

33
                - COST_FLAG_BW : weight = alpha*(1.0/bw)
34

35
                - COST_FLAG_LATENCY: weight = latency
36

37
                - COST_FLAG_RANDOM: weight = random cost
38

39
                - COST_FLAG_STATIC: given from outside (static)
40
                  definition
41

42
                - COST_FLAG_HOP (the defaul): hop: weight = 1
43
        """
44
        distance_list = []
4✔
45

46
        if cost_flag == Constants.COST_FLAG_BW:
4✔
47
            for u, v, w in self.graph.edges(data=True):
×
48
                # w[Constants.WEIGHT] = Constants.Max_L_BW - w[Constants.BANDWIDTH]
49
                w[Constants.WEIGHT] = Constants.ALPHA * (1.0 / w[Constants.BANDWIDTH])
×
50
                distance_list.append(w[Constants.WEIGHT])
×
51
        elif cost_flag == Constants.COST_FLAG_LATENCY:
4✔
52
            for u, v, w in self.graph.edges(data=True):
×
53
                w[Constants.WEIGHT] = w[Constants.LATENCY]
×
54
                distance_list.append(w[Constants.WEIGHT])
×
55
        elif cost_flag == Constants.COST_FLAG_RANDOM:
4✔
56
            for u, v, w in self.graph.edges(data=True):
×
57
                w[Constants.WEIGHT] = random.randint(1, 2**24)
×
58
                distance_list.append(w[Constants.WEIGHT])
×
59
        elif cost_flag == Constants.COST_FLAG_STATIC:
4✔
60
            for u, v, w in self.graph.edges(data=True):
×
61
                w[Constants.WEIGHT] = cost[u, v]
×
62
                distance_list.append(w[Constants.WEIGHT])
×
63
        else:
64
            for u, v, w in self.graph.edges(data=True):
4✔
65
                w[Constants.WEIGHT] = 1.0
4✔
66
                distance_list.append(w[Constants.WEIGHT])
4✔
67
        self.distance_list = distance_list
4!
68
        return distance_list
4✔
69

70
    # if u and v connected
71
    def nodes_connected(self, g, u, v):
4✔
72
        return u in g.neighbors(v)
×
73

74
    def get_connectivity(self):
4✔
75
        con = approx.node_connectivity(self.graph)
×
76
        return con
×
77

78
    def biconnectivity(self):
4✔
79
        pass
×
80

81

82
def dijnew(graph, start_node, end_node):
4✔
83
    """use dijsktra to get the primary shortest path"""
84
    graph_new = graph_simplify(graph)
×
85
    shortest_distance = {}
×
86
    predecessor = {}
×
87
    unseen_nodes = graph_new
×
88
    infinity = 9999999
×
89
    path = []
×
90
    for node in unseen_nodes:
×
91
        shortest_distance[node] = infinity
×
92
    shortest_distance[start_node] = 0
×
93

94
    while unseen_nodes:  # loop all the nodes in the list
×
95
        min_node = None
×
96
        for node in unseen_nodes:
×
97
            if min_node is None:
×
98
                min_node = node
×
99
            elif shortest_distance[node] < shortest_distance[min_node]:
×
100
                min_node = node  # find the current node
×
101

102
        for child_node, weight in graph[min_node].items():
×
103
            if weight + shortest_distance[min_node] < shortest_distance[child_node]:
×
104
                shortest_distance[child_node] = weight + shortest_distance[min_node]
×
105
                predecessor[child_node] = min_node
×
106
        unseen_nodes.pop(min_node)
×
107

108
    current_node = end_node  # run path backwards to get the real path
×
109
    while current_node != start_node:
×
110
        try:
×
111
            path.insert(0, current_node)
×
112
            current_node = predecessor[current_node]
×
113
        except KeyError:
×
114
            print("Path not reachable")
×
115
            break
×
116
    path.insert(0, start_node)
×
117
    if (
×
118
        shortest_distance[end_node] != infinity
119
    ):  # check if the end_node has been reached
120
        print("Shortest distance is " + str(shortest_distance[end_node]))
×
121
        print("And the path is " + str(path))
×
122
    return path
×
123

124

125
def graph_simplify(graph):
4✔
126
    """make the non-simple graph to be the simple graph"""
127
    for node in graph:
×
128
        for endpoint in graph[node]:
×
129
            try:
×
130
                if len(graph[node][endpoint]) > 1:
×
131
                    shortest = min(graph[node][endpoint])
×
132
                    graph[node][endpoint] = shortest
×
133
            except TypeError:
×
134
                pass
×
135
    return graph
×
136

137

138
def backup_path(graph, start_node, end_node):
4✔
139
    """remove the primary shortest path and redo the dijsktra to get the backup path"""
140
    backupstart_node = start_node
×
141
    backupend_node = end_node
×
142
    graphprimary = copy.deepcopy(graph)
×
143
    graph_original = copy.deepcopy(graph)
×
144
    graph_new = graph_simplify(graph)
×
145
    print("The primary path: ")
×
146
    path = dijnew(graphprimary, start_node, end_node)
×
147
    path_new = path.copy()
×
148
    path_new.pop()
×
149

150
    for (
×
151
        ele
152
    ) in path_new:  # update the graph, delete the path that was used in primary path
153
        index = path.index(ele)
×
154
        try:
×
155
            graph_original[ele][path[index + 1]].remove(graph_new[ele][path[index + 1]])
×
156
        except AttributeError:
×
157
            del graph_original[ele][path[index + 1]]
×
158

159
    for start_node in graph_original:  # reformat the updated graph list
×
160
        for end_node in graph_original[start_node]:
×
161
            try:
×
162
                if len(graph_original[start_node][end_node]) == 1:
×
163
                    graph_original[start_node][end_node] = graph_original[start_node][
×
164
                        end_node
165
                    ][0]
166
            except TypeError:
×
167
                continue
×
168

169
    print("The back up path: ")
×
170
    dijnew(graph_original, backupstart_node, backupend_node)
×
171

172

173
def create_unvisited_list(link_list):
4✔
174
    unvisited_list = []
×
175
    for keys in link_list:
×
176
        unvisited_list.append(keys)
×
177
    return unvisited_list
×
178

179

180
def create_unvisited_node(my_list):
4✔
181
    unvisited_node = {}
×
182
    for keys in my_list:
×
183
        unvisited_node[keys] = 0
×
184
    return unvisited_node
×
185

186

187
def get_graph_info(link_list):
4✔
188
    key_list = []
×
189
    for keys in link_list:
×
190
        key_list.append()
×
191
    print(key_list)
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc