• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

atlanticwave-sdx / pce / 3680947894

pending completion
3680947894

Pull #81

github

GitHub
Merge a6cd61918 into b40be3bdd
Pull Request #81: Updates coveralls.io configuration

845 of 1796 relevant lines covered (47.05%)

0.47 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

24.6
/src/Utility/functions.py
1
# -*- coding: utf-8 -*-
2
"""
1✔
3
Created on Wed Aug 11 16:40:56 2021
4

5
@author: Yufeng Xin (yxin@renci.org)
6
"""
7

8
import copy
1✔
9
import random
1✔
10

11
import networkx as nx
1✔
12
from networkx.algorithms import approximation as approx
1✔
13

14
import Utility.global_name as global_name
1✔
15

16

17
class GraphFunction:
1✔
18
    def __init__(self, g=None) -> None:
1✔
19
        self.graph = g
1✔
20

21
    def set_graph(self, g=None):
1✔
22
        self.graph = g
1✔
23

24
    # set weight (cost) per link, assuming the objective is minizing a function of weight
25
    #   flag:
26
    #       1: bw: weight = alpha*(1.0/bw)
27
    #       2: latency: weight = latency
28
    #       3: random: weight = random cost
29
    #       4: cost: given from outside (static) definition
30
    #       0 and default: hop: weight =1
31
    def weight_assign(self, flag=0, cost=None):
1✔
32

33
        distance_list = []
1✔
34

35
        if flag == 1:
1✔
36
            for (u, v, w) in self.graph.edges(data=True):
×
37
                # w[global_name.weight] = global_name.Max_L_BW - w[global_name.bandwidth]
38
                w[global_name.weight] = global_name.alpha * (
×
39
                    1.0 / w[global_name.bandwidth]
40
                )
41
                distance_list.append(w[global_name.weight])
×
42
        elif flag == 2:
1✔
43
            for (u, v, w) in self.graph.edges(data=True):
×
44
                w[global_name.weight] = w[global_name.latency]
×
45
                distance_list.append(w[global_name.weight])
×
46
        elif flag == 3:
1✔
47
            for (u, v, w) in self.graph.edges(data=True):
×
48
                w[global_name.weight] = random.randint(1, 2**24)
×
49
                distance_list.append(w[global_name.weight])
×
50
        elif flag == 4:
1✔
51
            for (u, v, w) in self.graph.edges(data=True):
×
52
                w[global_name.weight] = cost[u, v]
×
53
                distance_list.append(w[global_name.weight])
×
54
        else:
55
            for (u, v, w) in self.graph.edges(data=True):
1✔
56
                w[global_name.weight] = 1.0
1✔
57
                distance_list.append(w[global_name.weight])
1✔
58
        self.distance_list = distance_list
1✔
59
        return distance_list
1✔
60

61
    # if u and v connected
62
    def nodes_connected(g, u, v):
1✔
63
        return u in g.neighbors(v)
×
64

65
    def get_connectivity(self):
1✔
66
        con = approx.node_connectivity(self.graph)
×
67
        return con
×
68

69
    def biconnectivity(self):
1✔
70
        pass
×
71

72

73
# use dijsktra to get the primary shortest path
74
def dijnew(graph, start_node, end_node):
1✔
75
    graph_new = graph_simplify(graph)
×
76
    shortest_distance = {}
×
77
    predecessor = {}
×
78
    unseenNodes = graph_new
×
79
    infinity = 9999999
×
80
    path = []
×
81
    for node in unseenNodes:
×
82
        shortest_distance[node] = infinity
×
83
    shortest_distance[start_node] = 0
×
84

85
    while unseenNodes:  # loop all the nodes in the list
×
86
        minNode = None
×
87
        for node in unseenNodes:
×
88
            if minNode is None:
×
89
                minNode = node
×
90
            elif shortest_distance[node] < shortest_distance[minNode]:
×
91
                minNode = node  # find the current node
×
92

93
        for childNode, weight in graph[minNode].items():
×
94
            if weight + shortest_distance[minNode] < shortest_distance[childNode]:
×
95
                shortest_distance[childNode] = weight + shortest_distance[minNode]
×
96
                predecessor[childNode] = minNode
×
97
        unseenNodes.pop(minNode)
×
98

99
    currentNode = end_node  # run path backwards to get the real path
×
100
    while currentNode != start_node:
×
101
        try:
×
102
            path.insert(0, currentNode)
×
103
            currentNode = predecessor[currentNode]
×
104
        except KeyError:
×
105
            print("Path not reachable")
×
106
            break
×
107
    path.insert(0, start_node)
×
108
    if (
×
109
        shortest_distance[end_node] != infinity
110
    ):  # check if the end_node has been reached
111
        print("Shortest distance is " + str(shortest_distance[end_node]))
×
112
        print("And the path is " + str(path))
×
113
    return path
×
114

115

116
# make the non-simple graph to be the simple graph
117
def graph_simplify(graph):
1✔
118
    for node in graph:
×
119
        for endpoint in graph[node]:
×
120
            try:
×
121
                if len(graph[node][endpoint]) > 1:
×
122
                    shortest = min(graph[node][endpoint])
×
123
                    graph[node][endpoint] = shortest
×
124
            except TypeError:
×
125
                pass
×
126
    return graph
×
127

128

129
# remove the primary shortest path and redo the dijsktra to get the backup path
130
def backup_path(graph, start_node, end_node):
1✔
131
    backupstart_node = start_node
×
132
    backupend_node = end_node
×
133
    graphprimary = copy.deepcopy(graph)
×
134
    graph_original = copy.deepcopy(graph)
×
135
    graph_new = graph_simplify(graph)
×
136
    print("The primary path: ")
×
137
    path = dijnew(graphprimary, start_node, end_node)
×
138
    path_new = path.copy()
×
139
    path_new.pop()
×
140

141
    for (
×
142
        ele
143
    ) in path_new:  # update the graph, delete the path that was used in primary path
144
        index = path.index(ele)
×
145
        try:
×
146
            graph_original[ele][path[index + 1]].remove(graph_new[ele][path[index + 1]])
×
147
        except AttributeError:
×
148
            del graph_original[ele][path[index + 1]]
×
149

150
    for start_node in graph_original:  # reformat the updated graph list
×
151
        for end_node in graph_original[start_node]:
×
152
            try:
×
153
                if len(graph_original[start_node][end_node]) == 1:
×
154
                    graph_original[start_node][end_node] = graph_original[start_node][
×
155
                        end_node
156
                    ][0]
157
            except TypeError:
×
158
                continue
×
159

160
    print("The back up path: ")
×
161
    dijnew(graph_original, backupstart_node, backupend_node)
×
162

163

164
def create_unvisited_list(link_list):
1✔
165
    unvisited_list = []
×
166
    for keys in link_list:
×
167
        unvisited_list.append(keys)
×
168
    return unvisited_list
×
169

170

171
def create_unvisited_node(my_list):
1✔
172
    unvisited_node = {}
×
173
    for keys in my_list:
×
174
        unvisited_node[keys] = 0
×
175
    return unvisited_node
×
176

177

178
def get_graph_info(link_list):
1✔
179
    key_list = []
×
180
    for keys in link_list:
×
181
        key_list.append()
×
182
    print(key_list)
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc