• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

atlanticwave-sdx / pce / 14526428112

17 Apr 2025 11:14PM UTC coverage: 90.389% (-0.05%) from 90.44%
14526428112

Pull #284

github

web-flow
Merge 8097a0bdb into 98bd7bf35
Pull Request #284: Remove duplicated code when updating link property

621 of 712 branches covered (87.22%)

Branch coverage included in aggregate %.

5 of 5 new or added lines in 2 files covered. (100.0%)

1 existing line in 1 file now uncovered.

1401 of 1525 relevant lines covered (91.87%)

1.84 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.64
/src/sdx_pce/topology/manager.py
1
import copy
2✔
2
import datetime
2✔
3
import logging
2✔
4
from typing import Mapping
2✔
5

6
import networkx as nx
2✔
7
from sdx_datamodel.models.link import Link
2✔
8
from sdx_datamodel.models.service import Service
2✔
9
from sdx_datamodel.models.topology import (
2✔
10
    TOPOLOGY_INITIAL_VERSION,
11
    SDX_TOPOLOGY_ID_prefix,
12
)
13
from sdx_datamodel.parsing.topologyhandler import TopologyHandler
2✔
14

15
from sdx_pce.utils.constants import Constants
2✔
16

17
from .grenmlconverter import GrenmlConverter
2✔
18

19

20
class TopologyManager:
2✔
21
    """
2✔
22
    Manager for topology operations.
23

24
    Operations are:
25

26
        - Merge multiple topologies.
27

28
        - Convert to grenml (XML).
29
    """
30

31
    def __init__(self):
2✔
32
        # The merged "super" topology of topologies of different
33
        # domains, with inter-domain links between them computed.
34
        self._topology = None
2✔
35

36
        # Mapping from topology ID to topology.
37
        self._topology_map = {}
2✔
38

39
        # Mapping from port ID to port.
40
        self._port_map = {}
2✔
41

42
        # Mapping from port ID to link.
43
        self._port_link_map = {}
2✔
44

45
        # Number of interdomain links we computed.
46
        self._num_interdomain_link = 0
2✔
47

48
        self._logger = logging.getLogger(__name__)
49

50
        # mapping attributes for interdomain links
51
        self.status_map = {
2✔
52
            ("up", "up"): "up",
53
            ("up", "error"): "error",
54
            ("error", "up"): "error",
55
            ("error", "error"): "error",
56
            # defaults to down
57
        }
58
        self.state_map = {
2✔
59
            ("enabled", "enabled"): "enabled",
60
            ("maintenance", "maintenance"): "maintenance",
61
            ("maintenance", "enabled"): "maintenance",
62
            ("maintenance", "disabled"): "maintenance",
63
            ("enabled", "maintenance"): "maintenance",
64
            ("disabled", "maintenance"): "maintenance",
65
            # defults to disabled
66
        }
67
        # bandwidth: the bandwidth attribute will be created based on both port
68
        # speeds (the minimum of them). Port speed is stored on Port.type and
69
        # can be 100FE, 1GE, 10GE, 25GE, 40GE, 50GE, 100GE, 400GE, and Other
70
        # When the value Other is chosen, no bandwidth guaranteed services will
71
        # be supported, so that we map that value to bandwidth=0
72
        self.bandwidth_map = {
2✔
73
            "100FE": 0.1,
74
            "1GE": 1,
75
            "10GE": 10,
76
            "25GE": 25,
77
            "40GE": 40,
78
            "100GE": 100,
79
            "400GE": 400,
80
            "Other": 0,
81
        }
82

83
    def get_handler(self):
2✔
84
        return self.topology_handler
×
85

86
    def topology_id(self, id):
2✔
87
        self._topology._id(id)
×
88

89
    def set_topology(self, topology):
2✔
90
        self._topology = topology
2✔
91

92
    def get_topology(self):
2✔
93
        return self._topology
2✔
94

95
    def get_topology_dict(self):
2✔
96
        return self._topology.to_dict()
2✔
97

98
    def get_topology_map(self) -> dict:
2✔
99
        return self._topology_map
2✔
100

101
    def get_port_link_map(self) -> Mapping[str, dict]:
2✔
102
        """
103
        Return a mapping between port IDs and links.
104
        """
105
        return self._port_link_map
2✔
106

107
    def get_port_map(self) -> Mapping[str, dict]:
2✔
108
        """
109
        Return a mapping between port IDs and ports.
110
        """
111
        return self._port_map
2✔
112

113
    def clear_topology(self):
2✔
114
        self._topology = None
×
115
        self._topology_map = {}
×
116
        self._port_link_map = {}
×
117

118
    def add_topology(self, data):
2✔
119
        topology = TopologyHandler().import_topology_data(data)
2✔
120
        self._topology_map[topology.id] = topology
2✔
121

122
        if self._topology is None:
2✔
123
            self._topology = copy.deepcopy(topology)
2✔
124
            interdomain_ports = []
2✔
125

126
            # Generate a new topology id
127
            self.generate_id()
2✔
128

129
            # Addding to the port list
130
            # links = topology.links
131
            # for link in links:
132
            #    for port in link.ports:
133
            #        self._port_link_map[port["id"]] = link
134
        else:
135
            # check the inter-domain links first.
136
            interdomain_ports = self.inter_domain_check(topology)
2✔
137
            self._num_interdomain_link += len(interdomain_ports)
2✔
138
            if self._num_interdomain_link == 0:
2✔
139
                self._logger.debug(
140
                    f"Warning: no interdomain links detected in {topology.id}!"
141
                )
142

143
            # Nodes
144
            nodes = topology.nodes
2✔
145
            self._topology.add_nodes(nodes)
2✔
146

147
            # links
148
            links = topology.links
2✔
149
            self._topology.add_links(links)
2✔
150

151
            # version
152
            self.update_version(True)
2✔
153

154
        # Addding to the port list
155
        links = topology.links
2✔
156
        for link in links:
2✔
157
            for port in link.ports:
2✔
158
                port_id = port if isinstance(port, str) else port["id"]
2✔
159
                self._port_link_map[port_id] = link
2✔
160

161
        # Addding to the port node
162
        nodes = topology.nodes
2✔
163
        for node in nodes:
2✔
164
            for port in node.ports:
2✔
165
                self._port_map[port.id] = port
2✔
166

167
        # inter-domain links
168
        self.add_inter_domain_links(topology, interdomain_ports)
2✔
169

170
        self.update_timestamp()
2✔
171

172
    def get_domain_name(self, node_id):
2✔
173
        """
174
        Find the topology ID associated with the given node ID.
175

176
        A topology ID is expected to be of the format
177
        "urn:sdx:topology:amlight.net", and from this, we
178
        can find the domain name associated with the topology.
179

180
        TODO: This function name may be a misnomer?
181
        """
182
        domain_id = None
2✔
183
        # print(f"len of topology_list: {len(self._topology_map)}")
184
        for topology_id, topology in self._topology_map.items():
2✔
185
            if topology.has_node_by_id(node_id):
2✔
186
                domain_id = topology_id
2✔
187
                break
2✔
188

189
        return domain_id
2!
190

191
    def generate_id(self):
2✔
192
        self._topology.id = SDX_TOPOLOGY_ID_prefix
2✔
193
        self._topology.version = TOPOLOGY_INITIAL_VERSION
2✔
194
        return id
2✔
195

196
    def remove_topology(self, topology_id):
2✔
197
        self._topology_map.pop(topology_id, None)
×
198
        self.update_version(False)
×
199
        self.update_timestamp()
×
200

201
    def is_link_interdomain(self, link, topology):
2✔
202
        """
203
        Check if a link is an interdomain link.
204
        """
205
        for port in link.ports:
2✔
206
            port_id = port if isinstance(port, str) else port["id"]
2✔
207
            if port_id not in self._port_link_map:
2✔
208
                return True
2✔
209
        return False
2✔
210

211
    def is_interdomain_port(self, port_id, topology_id):
2✔
212
        """
213
        Check if a Port ID is interdomain
214
        """
215
        # Sanity checks
216
        if (
2✔
217
            not isinstance(port_id, str)
218
            or not port_id.startswith("urn:sdx:port:")
219
            or not isinstance(topology_id, str)
220
            or not topology_id.startswith("urn:sdx:topology:")
221
        ):
222
            return False
2✔
223
        return port_id.split(":")[3] != topology_id.split(":")[3]
2✔
224

225
    def get_down_nni_links(self, topology):
2✔
226
        down_nni_links = []
2✔
227
        for node in topology.nodes:
2✔
228
            for port in node.ports:
2!
229
                if port.nni and port.status == "down":
2✔
230
                    old_port = self.get_port_obj_by_id(self._topology, port.id)
×
231
                    if old_port and old_port.status == "up":
×
232
                        link = self._port_link_map.get(port.id)
×
233
                        if link and link not in down_nni_links:
×
234
                            down_nni_links.append(link)
×
235
        return down_nni_links
2✔
236

237
    def get_up_nni_links(self, topology):
2✔
238
        up_nni_links = []
2✔
239
        for node in topology.nodes:
2✔
240
            for port in node.ports:
2!
241
                if port.nni and port.status == "up":
2✔
242
                    old_port = self.get_port_obj_by_id(self._topology, port.id)
2✔
243
                    if old_port and old_port.status == "down":
2✔
244
                        link = self._port_link_map.get(port.id)
×
245
                        if link and link not in up_nni_links:
×
246
                            up_nni_links.append(link)
×
247
        return up_nni_links
2✔
248

249
    def get_down_links(self, old_topology, topology):
2✔
250
        """
251
        Get the links that are down in the new topology.
252
        """
253
        down_links = []
2✔
254
        for link in old_topology.links:
2!
255
            if link.status in ("up", None):
2✔
256
                new_link = topology.get_link_by_id(link.id)
2✔
257
                if new_link and (
2✔
258
                    new_link.status == "down" or new_link.state in ("disabled", None)
259
                ):
260
                    down_links.append(link)
2✔
261
                else:  # further check its ports
262
                    for port_id in link.ports:
2✔
263
                        port = self.get_port_obj_by_id(old_topology, port_id)
2✔
264
                        new_port_id = (
2✔
265
                            next((p for p in new_link.ports if p == port_id), None)
266
                            if new_link
267
                            else None
268
                        )
269
                        new_port = self.get_port_obj_by_id(topology, new_port_id)
2✔
270
                        if not new_port or (
2✔
271
                            (port.status == "up" and new_port.status == "down")
272
                            or (
273
                                port.state == "enabled" and new_port.state == "disabled"
274
                            )
275
                        ):
276
                            new_link = topology.get_link_by_id(link.id)
2✔
277
                            if new_link:
2✔
278
                                new_link = self.update_link_property(
×
279
                                    new_link.id, "status", "down"
280
                                )
281
                            down_links.append(link)
2✔
282
                            break
2✔
283
        if down_links:
2✔
284
            self._logger.info(
285
                f"Down links detected: {[link.id for link in down_links]}"
286
            )
287
        else:
288
            self._logger.info("No down links detected.")
289
        return down_links
2✔
290

291
    def get_up_links(self, old_topology, topology):
2✔
292
        """
293
        Get the links that are down in the new topology.
294
        """
295
        up_links = []
2✔
296
        for link in old_topology.links:
2✔
297
            if link.status in ("down", None) or link.state in ("disabled", None):
2✔
298
                new_link = topology.get_link_by_id(link.id)
2✔
299
                if new_link is not None and (
2✔
300
                    new_link.status == "up" and new_link.state in ("enabled", None)
301
                ):
302
                    up_links.append(new_link)
×
303
        return up_links
2✔
304

305
    def topology_diff(self, old_topology, topology):
2✔
306

307
        added_nodes = set()
2✔
308
        added_links = set()
2✔
309
        removed_nodes = set()
2✔
310
        removed_links = set()
2✔
311

312
        # obtain the objects
313
        removed_links_list = []
2✔
314
        added_links_list = []
2✔
315
        removed_nodes_list = []
2✔
316
        added_nodes_list = []
2✔
317

318
        if old_topology is not None:
2✔
319
            removed_nodes = set(old_topology.nodes_id()).difference(
2✔
320
                set(topology.nodes_id())
321
            )
322
            added_nodes = set(topology.nodes_id()).difference(
2✔
323
                set(old_topology.nodes_id())
324
            )
325
            removed_links = set(old_topology.links_id()).difference(
2✔
326
                set(topology.links_id())
327
            )
328
            added_links = set(topology.links_id()).difference(
2✔
329
                set(old_topology.links_id())
330
            )
331
        else:
332
            self._logger.warning(
333
                f"No existing topology found for update: {topology.id}"
334
            )
335
            self._logger.info(f"Topology map keys: {list(self._topology_map.keys())}")
336
            return (
×
337
                removed_nodes_list,
338
                added_nodes_list,
339
                removed_links_list,
340
                added_links_list,
341
            )
342

343
        for link_id in removed_links:
2!
344
            link_obj = old_topology.get_link_by_id(link_id)
2✔
345
            if link_obj is not None:
2✔
346
                removed_links_list.append(link_obj)
2✔
347
        for link_id in added_links:
2!
348
            link_obj = topology.get_link_by_id(link_id)
2✔
349
            if link_obj is not None:
2✔
350
                added_links_list.append(link_obj)
2✔
351
        for node_id in removed_nodes:
2!
352
            node_obj = old_topology.get_node_by_id(node_id)
2✔
353
            if node_obj is not None:
2✔
354
                removed_nodes_list.append(node_obj)
2✔
355
        for node_id in added_nodes:
2!
356
            node_obj = topology.get_node_by_id(node_id)
2✔
357
            if node_obj is not None:
2✔
358
                added_nodes_list.append(node_obj)
2✔
359

360
        # adding the down links to the removed links list
361
        down_links = self.get_down_links(old_topology, topology)
2✔
362

363
        for link in down_links:
2✔
364
            if link not in removed_links_list:
2✔
365
                removed_links_list.append(link)
2✔
366

367
        # adding the up links to the added links list
368
        up_links = self.get_up_links(old_topology, topology)
2✔
369

370
        for link in up_links:
2✔
371
            added_links_list.append(link)
×
372

373
        return (
2✔
374
            removed_nodes_list,
375
            added_nodes_list,
376
            removed_links_list,
377
            added_links_list,
378
        )
379

380
    def update_topology(self, data):
2✔
381
        # likely adding new inter-domain links
382
        update_handler = TopologyHandler()
2✔
383
        topology = update_handler.import_topology_data(data)
2✔
384
        old_topology = self._topology_map.get(topology.id)
2✔
385
        self._topology_map[topology.id] = topology
2✔
386

387
        # Nodes.
388
        nodes = topology.nodes
2✔
389
        for node in nodes:
2✔
390
            self._topology.remove_node(node.id)
2✔
391

392
        # Links.
393
        links = topology.links
2✔
394
        for link in links:
2✔
395
            if not self.is_link_interdomain(link, topology):
2✔
396
                # print(link.id+";......."+str(link.nni))
397
                self._topology.remove_link(link.id)
2✔
398
                for port in link.ports:
2✔
399
                    port_id = port if isinstance(port, str) else port["id"]
2✔
400
                    self._port_link_map.pop(port_id)
2✔
401

402
        # Check the inter-domain links first.
403
        interdomain_ports = self.inter_domain_check(topology)
2✔
404
        if len(interdomain_ports) == 0:
2✔
405
            self._logger.warning("Warning: no interdomain links detected!")
406

407
        # Nodes.
408
        nodes = topology.nodes
2✔
409
        self._topology.add_nodes(nodes)
2✔
410

411
        # Links.
412
        links = topology.links
2✔
413
        self._topology.add_links(links)
2✔
414

415
        # inter-domain links
416
        self.add_inter_domain_links(topology, interdomain_ports)
2✔
417

418
        # Update the port node map
419
        for node in topology.nodes:
2✔
420
            for port in node.ports:
2✔
421
                self._port_map[port.id] = port
2✔
422

423
        # Addding to the port list
424
        links = topology.links
2✔
425
        for link in links:
2✔
426
            for port in link.ports:
2✔
427
                port_id = port if isinstance(port, str) else port["id"]
2✔
428
                self._port_link_map[port_id] = link
2✔
429

430
        # extract the changes for controller rerouting actions: link removal and link down
431
        (removed_nodes_list, added_nodes_list, removed_links_list, added_links_list) = (
2✔
432
            self.topology_diff(old_topology, topology)
433
        )
434

435
        if topology.version > old_topology.version:
2✔
436
            self.update_version(True)
2✔
437
        if topology.timestamp != old_topology.timestamp:
2✔
438
            self.update_timestamp()
2✔
439

440
        # extra link status changes: up <-> down that is associated with nni port status changes: up <-> down
441
        # comparing with the global topology to catch nni links
442

443
        get_down_nni_links = self.get_down_nni_links(topology)
2✔
444
        for link in get_down_nni_links:
2!
445
            if link not in removed_links_list:
×
446
                removed_links_list.append(link)
×
447

448
        get_up_nni_links = self.get_up_nni_links(topology)
2✔
449
        for link in get_up_nni_links:
2!
450
            if link not in added_links_list:
×
451
                added_links_list.append(link)
×
452

453
        return (
2✔
454
            removed_nodes_list,
455
            added_nodes_list,
456
            removed_links_list,
457
            added_links_list,
458
        )
459

460
    def update_version(self, sub: bool):
2✔
461
        try:
2✔
462
            [ver, sub_ver] = self._topology.version.split(".")
2✔
463
        except ValueError:
2✔
464
            ver = self._topology.version
2✔
465
            sub_ver = "0"
2✔
466

467
        self._topology.version = self.new_version(ver, sub_ver, sub)
2✔
468

469
        return self._topology.version
2✔
470

471
    def new_version(self, ver, sub_ver, sub: bool):
2✔
472
        new_version = ver
2✔
473
        if sub:
2✔
474
            new_version = str(int(ver) + 1)
2✔
475

476
        if sub_ver != "0":
2!
477
            new_version = ver + "." + str(int(sub_ver) + 1)
2✔
478

479
        return new_version
2✔
480

481
    def update_timestamp(self):
2✔
482
        ct = datetime.datetime.now().isoformat()
2✔
483
        self._topology.timestamp = ct
2✔
484

485
        return ct
2✔
486

487
    def inter_domain_check(self, topology):
2✔
488
        interdomain_port_dict = {}
2✔
489
        interdomain_ports = []
2✔
490
        interdomain_port_ids = []
2✔
491
        links = topology.links
2✔
492
        link_dict = {}
2✔
493
        for link in links:
2✔
494
            link_dict[link.id] = link
2✔
495
            for port in link.ports:
2✔
496
                port_id = port if isinstance(port, str) else port["id"]
2✔
497
                interdomain_port_dict[port_id] = link
2✔
498

499
        # match any ports in the existing topology
500
        for port_id in interdomain_port_dict:
2✔
501
            # print("interdomain_port:")
502
            # print(port_id)
503
            for existing_port, existing_link in self._port_link_map.items():
2✔
504
                # print(existing_port)
505
                if port_id == existing_port:
2✔
506
                    # print("Interdomain port:" + port_id)
507
                    # remove redundant link between two domains
508
                    self._topology.remove_link(existing_link.id)
2✔
509
                    interdomain_port_ids.append(port_id)
2✔
510
            self._port_link_map[port_id] = interdomain_port_dict[port_id]
2✔
511

512
        # count for inter-domain links according to topo spec 2.0.x
513
        for node in topology.nodes:
2✔
514
            for port in node.ports:
2✔
515
                # interdomain ports based on previous methodology
516
                if port.id in interdomain_port_ids:
2✔
517
                    interdomain_ports.append(port)
2✔
518
                # interdomain ports based on new methodology (spec 2.0)
519
                if self.is_interdomain_port(port.nni, topology.id):
2✔
520
                    interdomain_ports.append(port)
2✔
521

522
        return interdomain_ports
2✔
523

524
    def create_update_interdomain_link(self, port1, port2):
2✔
525
        """Create or update an interdomain link from two ports."""
526
        if port2.id < port1.id:
2✔
527
            port1, port2 = port2, port1
2✔
528

529
        port1_id = port1.id.replace("urn:sdx:port:", "", 1)
2✔
530
        port2_id = port2.id.replace("urn:sdx:port:", "", 1)
2✔
531
        link_id = f"urn:sdx:link:interdomain:{port1_id}:{port2_id}"
2✔
532

533
        for link in self._topology.links:
2✔
534
            if link_id == link.id:
2✔
535
                break
2✔
536
        else:
537
            link = Link(
2✔
538
                id=link_id,
539
                name=f"{port1.name}--{port2.name}",
540
                ports=[port1.id, port2.id],
541
                bandwidth=min(
542
                    self.bandwidth_map.get(port1.type, 100),
543
                    self.bandwidth_map.get(port2.type, 100),
544
                ),
545
                residual_bandwidth=100,
546
                latency=0,
547
                packet_loss=0,
548
                availability=100,
549
            )
550
            self._topology.add_links([link])
2✔
551

552
        link.status = self.status_map.get((port1.status, port2.status), "down")
2✔
553
        link.state = self.state_map.get((port1.state, port2.state), "disabled")
2✔
554

555
    def add_inter_domain_links(self, topology, interdomain_ports):
2✔
556
        """Add inter-domain links (whenever possible)."""
557
        for port in interdomain_ports:
2✔
558
            other_port = self._port_map.get(port.nni)
2✔
559
            if not other_port or other_port.nni != port.id:
2✔
560
                self._logger.warning(
561
                    "Interdomain link not added now - didnt find other port:"
562
                    f" port={port.id} other_port={port.nni} ({other_port})"
563
                )
564
                continue
2✔
565
            self.create_update_interdomain_link(port, other_port)
2✔
566

567
    def get_failed_links(self) -> dict:
2✔
568
        """Get failed links on the topology (ie., Links not up and enabled)."""
569
        failed_links = []
2✔
570
        for link in self._topology.links:
2✔
571
            if link.status in ("up", None) and link.state in ("enabled", None):
2✔
572
                continue
2✔
573
            failed_links.append({"id": link.id, "ports": link.ports})
2✔
574
        return failed_links
2✔
575

576
    # adjacent matrix of the graph, in jason?
577
    def generate_graph(self):
2✔
578
        graph = nx.Graph()
2✔
579

580
        if self._topology is None:
2✔
581
            self._logger.warning("We do not have a topology yet")
582
            return None
×
583

584
        links = self._topology.links
2✔
585
        for link in links:
2✔
586
            inter_domain_link = False
2✔
587
            if link.status not in ("up", None) or link.state not in ("enabled", None):
2✔
588
                continue
2✔
589
            ports = link.ports
2✔
590
            end_nodes = []
2✔
591
            for port in ports:
2✔
592
                port_id = port if isinstance(port, str) else port["id"]
2✔
593
                node = self._topology.get_node_by_port(port_id)
2✔
594
                if node is None:
2✔
595
                    self._logger.warning(
596
                        f"This port (id: {port_id}) does not belong to "
597
                        f"any node in the topology, likely a Non-SDX port!"
598
                    )
599
                    inter_domain_link = True
2✔
600
                    break
2✔
601
                else:
602
                    end_nodes.append(node)
2✔
603
                    # print("graph node:"+node.id)
604
            if not inter_domain_link:
2✔
605
                graph.add_edge(end_nodes[0].id, end_nodes[1].id)
2✔
606
                edge = graph.edges[end_nodes[0].id, end_nodes[1].id]
2✔
607
                edge["id"] = link.id
2✔
608
                edge[Constants.LATENCY] = link.latency
2✔
609
                edge[Constants.BANDWIDTH] = (
2✔
610
                    link.bandwidth * link.residual_bandwidth * 0.01
611
                )
612
                edge[Constants.RESIDUAL_BANDWIDTH] = (
2✔
613
                    link.residual_bandwidth
614
                )  # this is a percentage
615
                edge["weight"] = 1000.0 * (1.0 / link.residual_bandwidth)
2✔
616
                edge[Constants.PACKET_LOSS] = link.packet_loss
2✔
617
                edge[Constants.AVAILABILITY] = link.availability
2✔
618

619
        return graph
2✔
620

621
    def generate_grenml(self):
2✔
622
        self.converter = GrenmlConverter(self._topology)
×
623

624
        return self.converter.read_topology()
×
625

626
    def add_domain_service(self):
2✔
627
        pass
×
628

629
    # may need to read from a configuration file.
630
    def update_private_properties(self):
2✔
631
        pass
×
632

633
    def get_residul_bandwidth(self) -> dict:
2✔
634
        """
635
        Get the residual bandwidth on each link in the topology.
636

637
        :return: A dictionary indexed by the link ID with residual bandwidth as values.
638
        """
639
        residual_bandwidth = {}
2✔
640
        links = self._topology.links
2✔
641

642
        for link in links:
2✔
643
            link_id = link.id
2✔
644
            residual_bw = link.__getattribute__(Constants.RESIDUAL_BANDWIDTH)
2✔
645
            if residual_bw is not None:
2✔
646
                residual_bandwidth[link_id] = residual_bw
2✔
647
            else:
648
                self._logger.warning(f"Residual bandwidth not found for link {link_id}")
649

650
        return residual_bandwidth
2✔
651

652
    # on performance properties for now
653
    def update_link_property(self, link_id, property, value):
2✔
654
        # 1. update the individual topology
655
        for id, topology in self._topology_map.items():
2✔
656
            link = topology.get_link_by_id(link_id)
2✔
657
            if link is not None:
2✔
658
                setattr(link, property, value)
2✔
659
                self._logger.info("updated the link.")
660
                # 1.2 need to change the sub_ver of the topology?
661

662
        # 2. check on the inter-domain link?
663
        # update the interdomain topology
664
        link = self._topology.get_link_by_id(link_id)
2✔
665

666
        if link is not None:
2✔
667
            setattr(link, property, value)
2✔
668
            self._logger.info(f"updated the link:{link_id} {property} to {value}")
669

670
        return link
2!
671

672
    # on performance properties for now
673
    def change_link_property_by_value(
2✔
674
        self, port_id_0, port_id_1, property, value, replace=True
675
    ):
676
        # If it's bandwdith, we need to update the residual bandwidth as a percentage
677
        # "bandwidth" remains to keep the original port bandwidth in topology json.
678
        # in the graph model, linkd bandwidth is computed as bandwidth*residual_bandwidth*0.01
679
        link = self._topology.get_link_by_port_id(port_id_0, port_id_1)
2✔
680
        if link is not None:
2✔
681
            orignial_bw = link.__getattribute__(Constants.BANDWIDTH)
2✔
682
            residual = link.__getattribute__(property)
2✔
683
            if property == Constants.RESIDUAL_BANDWIDTH:
2✔
684
                if replace is False:
2✔
685
                    residual_bw = (
2✔
686
                        link.__getattribute__(Constants.BANDWIDTH) * residual * 0.01
687
                    )
688
                    new_residual = max((residual_bw + value) * 100 / orignial_bw, 0.001)
2✔
689
                else:
690
                    new_residual = value
2✔
691
                setattr(link, property, new_residual)
2✔
692
                self._logger.info(
693
                    "updated the link:"
694
                    + link._id
695
                    + ":"
696
                    + property
697
                    + " from "
698
                    + str(residual)
699
                    + " to "
700
                    + str(new_residual)
701
                )
702

703
    def change_port_vlan_range(self, topology_id, port_id, value):
2✔
704
        topology = self._topology_map.get(topology_id)
2✔
705
        port = self.get_port_obj_by_id(topology, port_id)
2✔
706
        if port is None:
2✔
707
            self._logger.debug(f"Port not found in changing vlan range:{port_id}")
UNCOV
708
            return None
×
709
        self._logger.debug(f"Port found:{port_id};new vlan range:{value}")
710

711
        vlan_range_v1 = port.__getattribute__("vlan_range")
2✔
712
        if vlan_range_v1:
2✔
713
            port.__setattr__("vlan_range", value)
2✔
714
        services = port.__getattribute__(Constants.SERVICES)
2✔
715
        if services:
2✔
716
            l2vpn_ptp = services.__getattribute__(Constants.L2VPN_P2P)
2✔
717
            if l2vpn_ptp:
2✔
718
                l2vpn_ptp["vlan_range"] = value
2✔
719
        else:
720
            self._logger.debug(f"Port has no services (v2):{port_id}")
721
            l2vpn_ptp = {}
2✔
722
            l2vpn_ptmp = {}
2✔
723
            l2vpn_ptp["vlan_range"] = value
2✔
724
            services = Service(l2vpn_ptp=l2vpn_ptp, l2vpn_ptmp=l2vpn_ptmp)
2✔
725
            port.__setattr__(Constants.SERVICES, services)
2✔
726

727
        # update the whole topology
728
        topology = self.get_topology()
2!
729
        port = self.get_port_obj_by_id(topology, port_id)
2✔
730
        if port is None:
2✔
731
            self._logger.debug(f"Port not found in changing vlan range:{port_id}")
732
            return None
×
733
        self._logger.debug(f"Port found:{port_id};new vlan range:{value}")
734
        vlan_range_v1 = port.__getattribute__("vlan_range")
2✔
735
        if vlan_range_v1:
2✔
736
            port.__setattr__("vlan_range", value)
2✔
737
        port.__setattr__(Constants.SERVICES, services)
2✔
738

739
        self._logger.info(
740
            "updated the port:" + port_id + " vlan_range" + " to " + str(value)
741
        )
742

743
    def update_element_property_json(self, data, element, element_id, property, value):
2✔
744
        elements = data[element]
2✔
745
        for element in elements:
2✔
746
            if element["id"] == element_id:
2✔
747
                element[property] = value
×
748

749
        try:
2✔
750
            [ver, sub_ver] = data["version"].split(".")
2✔
751
        except ValueError:
×
752
            ver = "0"
×
753
            sub_ver = "0"
×
754

755
        data["version"] = self.new_version(ver, sub_ver, True)
2✔
756
        data["timestamp"] = datetime.datetime.now().isoformat()
2✔
757

758
    def get_port_by_id(self, port_id: str):
2✔
759
        """
760
        Given port id, returns a Port.
761
        """
762
        for node in self.get_topology().nodes:
2✔
763
            for port in node.ports:
2✔
764
                if port.id == port_id:
2✔
765
                    return port.to_dict()
2✔
766
        return None
×
767

768
    def get_port_obj_by_id(self, topology, port_id: str):
2✔
769
        """
770
        Given port id, returns a Port.
771
        """
772
        for node in topology.nodes:
2✔
773
            for port in node.ports:
2✔
774
                if port.id == port_id:
2✔
775
                    return port
2✔
776
        return None
2✔
777

778
    def are_two_ports_same_domain(self, port1_id: str, port2_id: str):
2✔
779
        """
780
        Check if two ports are in the same domain.
781
        """
782
        node1 = self.get_topology().get_node_by_port(port1_id)
2✔
783
        node2 = self.get_topology().get_node_by_port(port2_id)
2✔
784
        if node1 is None or node2 is None:
2✔
785
            return False
×
786

787
        domain1 = self.get_domain_name(node1.id)
2✔
788
        domain2 = self.get_domain_name(node2.id)
2✔
789
        return domain1 == domain2
2✔
790

791
    def update_node_property(self):
2✔
792
        pass
×
793

794
    def update_port_property(self):
2✔
795
        pass
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc