• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

atlanticwave-sdx / pce / 6317691810

26 Sep 2023 08:00PM UTC coverage: 80.58% (-0.3%) from 80.83%
6317691810

Pull #165

github

web-flow
Merge c756b30e3 into 0571c82c9
Pull Request #165: Get links in a connection path

306 of 401 branches covered (0.0%)

Branch coverage included in aggregate %.

21 of 24 new or added lines in 1 file covered. (87.5%)

21 existing lines in 2 files now uncovered.

889 of 1082 relevant lines covered (82.16%)

3.28 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.42
/src/sdx_pce/topology/temanager.py
1
import threading
4✔
2
from itertools import chain
4✔
3
from typing import List, Optional
4✔
4

5
import networkx as nx
4✔
6
from networkx.algorithms import approximation as approx
4✔
7
from sdx_datamodel.parsing.connectionhandler import ConnectionHandler
4✔
8

9
from sdx_pce.models import (
4✔
10
    ConnectionPath,
11
    ConnectionRequest,
12
    ConnectionSolution,
13
    TrafficMatrix,
14
    VlanTag,
15
    VlanTaggedBreakdown,
16
    VlanTaggedBreakdowns,
17
    VlanTaggedPort,
18
)
19
from sdx_pce.topology.manager import TopologyManager
4✔
20
from sdx_pce.utils.exceptions import ValidationError
4✔
21

22
UNUSED_VLAN = None
4✔
23

24

25
class TEManager:
4✔
26

27
    """
4✔
28
    TE Manager for connection - topology operations.
29

30
    Functions of this class are:
31

32
        - generate inputs to the PCE solver
33

34
        - converter the solver output.
35

36
        - VLAN reservation and unreservation.
37
    """
38

39
    def __init__(self, topology_data):
4✔
40
        self.topology_manager = TopologyManager()
4✔
41

42
        # A lock to safely perform topology operations.
43
        self._topology_lock = threading.Lock()
4✔
44

45
        # A {domain, {port, {vlan, in_use}}} mapping.
46
        self._vlan_tags_table = {}
4✔
47

48
        # Making topology_data optional while investigating
49
        # https://github.com/atlanticwave-sdx/sdx-controller/issues/145.
50
        #
51
        # TODO: a nicer thing to do would be to keep less state around.
52
        # https://github.com/atlanticwave-sdx/pce/issues/122
53
        if topology_data:
4✔
54
            self.topology_manager.add_topology(topology_data)
4✔
55
            self.graph = self.generate_graph_te()
4✔
56
            self._update_vlan_tags_table(
4✔
57
                domain_name=topology_data.get("id"),
58
                port_map=self.topology_manager.get_port_map(),
59
            )
60
        else:
61
            self.graph = None
4✔
62

63
    def add_topology(self, topology_data: dict):
4✔
64
        """
65
        Add a new topology to TEManager.
66

67
        :param topology_data: a dictionary that represents a topology.
68
        """
69
        self.topology_manager.add_topology(topology_data)
4✔
70

71
        # Ports appear in two places in the combined topology
72
        # maintained by TopologyManager: attached to each of the
73
        # nodes, and attached to links.  Here we are using the ports
74
        # attached to links.
75
        self._update_vlan_tags_table(
4✔
76
            domain_name=topology_data.get("id"),
77
            port_map=self.topology_manager.get_port_map(),
78
        )
79

80
    def update_topology(self, topology_data: dict):
4✔
81
        """
82
        Update an existing topology in TEManager.
83

84
        :param topology_data: a dictionary that represents a topology.
85
        """
86
        self.topology_manager.update_topology(topology_data)
4✔
87

88
        # TODO: careful here when updating VLAN tags table -- what do
89
        # we do when an in use VLAN tag becomes invalid in the update?
90
        # See https://github.com/atlanticwave-sdx/pce/issues/123
91
        #
92
        # self._update_vlan_tags_table_from_links(
93
        #     domain_name=topology_data.get("id"),
94
        #     port_list=self.topology_manager.port_list,
95
        # )
96

97
    def _update_vlan_tags_table(self, domain_name: str, port_map: dict):
4✔
98
        """
99
        Update VLAN tags table.
100
        """
101
        self._vlan_tags_table[domain_name] = {}
4✔
102

103
        for port_id, link in port_map.items():
4✔
104
            # TODO: port here seems to be a dict, not sdx_datamodel.models.Port
105
            for port in link.ports:
4✔
106
                # Collect all port IDs in this link.  Each link should
107
                # have two ports.
108
                link_port_ids = [x.get("id") for x in link.ports]
4✔
109

110
                # Do some error checks.
111
                link_port_count = len(link_port_ids)
4✔
112

113
                if link_port_count != 2:
4✔
114
                    raise ValidationError(f"Link has {link_port_count} ports, not 2")
×
115

116
                if port_id not in link_port_ids:
4✔
117
                    raise ValidationError(f"port {port_id} not in {link_port_ids}")
×
118

119
                label_range = port.get("label_range")
4✔
120

121
                # TODO: why is label_range sometimes None, and what to
122
                # do when that happens?
123
                if label_range is None:
4✔
124
                    print(f"label_range on {port.get('id')} is None")
4✔
125
                    continue
4✔
126

127
                # label_range is of the form ['100-200', '1000']; let
128
                # us expand it.  Would have been ideal if this was
129
                # already in some parsed form, but it is not, so this
130
                # is a work-around.
131
                all_labels = self._expand_label_range(label_range)
4✔
132

133
                # Make a map like: `{label1: UNUSED_VLAN, label2: UNUSED_VLAN,...}`
134
                labels_available = {label: UNUSED_VLAN for label in all_labels}
4✔
135

136
                self._vlan_tags_table[domain_name][port_id] = labels_available
4✔
137

138
    def _expand_label_range(self, label_range: List[str]) -> List[int]:
4✔
139
        """
140
        Expand the label range to a list of numbers.
141
        """
142
        labels = [self._expand_label(label) for label in label_range]
4✔
143
        # flatten result and return it.
144
        return list(chain.from_iterable(labels))
4✔
145

146
    def _expand_label(self, label: str) -> List[int]:
4✔
147
        """
148
        Expand items in label range to a list of numbers.
149

150
        Items in label ranges can be of the form "100-200" or "100".
151
        For the first case, we return [100,101,...200]; for the second
152
        case, we return [100].
153
        """
154
        if not isinstance(label, str):
4✔
155
            raise ValidationError("Label must be a string.")
×
156

157
        parts = label.split("-")
4✔
158
        start = int(parts[0])
4✔
159
        stop = int(parts[-1]) + 1
4✔
160

161
        return list(range(start, stop))
4✔
162

163
    def generate_traffic_matrix(self, connection_request: dict) -> TrafficMatrix:
4✔
164
        """
165
        Generate a Traffic Matrix from the connection request we have.
166

167
        A connection request specifies an ingress port, an egress
168
        port, and some other properties.  The ports may belong to
169
        different domains.  We need to break that request down into a
170
        set of requests, each of them specific to a domain.  We call
171
        such a domain-wise set of requests a traffic matrix.
172
        """
173
        print(f"generate_traffic_matrix: connection_request: {connection_request}")
4✔
174

175
        request = ConnectionHandler().import_connection_data(connection_request)
4✔
176

177
        print(f"generate_traffic_matrix: decoded request: {request}")
4✔
178

179
        ingress_port = request.ingress_port
4✔
180
        egress_port = request.egress_port
4✔
181

182
        print(
4✔
183
            f"generate_traffic_matrix, ports: "
184
            f"ingress_port.id: {ingress_port.id}, "
185
            f"egress_port.id: {egress_port.id}"
186
        )
187

188
        topology = self.topology_manager.get_topology()
4✔
189

190
        ingress_node = topology.get_node_by_port(ingress_port.id)
4✔
191
        egress_node = topology.get_node_by_port(egress_port.id)
4✔
192

193
        if ingress_node is None:
4✔
194
            print(f"No ingress node was found for ingress port ID '{ingress_port.id}'")
4✔
195
            return None
4✔
196

197
        if egress_node is None:
4✔
198
            print(f"No egress node is found for egress port ID '{egress_port.id}'")
×
199
            return None
×
200

201
        ingress_nodes = [
4✔
202
            x for x, y in self.graph.nodes(data=True) if y["id"] == ingress_node.id
203
        ]
204

205
        egress_nodes = [
4✔
206
            x for x, y in self.graph.nodes(data=True) if y["id"] == egress_node.id
207
        ]
208

209
        if len(ingress_nodes) <= 0:
4✔
210
            print(f"No ingress node '{ingress_node.id}' found in the graph")
×
211
            return None
×
212

213
        if len(egress_nodes) <= 0:
4✔
214
            print(f"No egress node '{egress_node.id}' found in the graph")
×
215
            return None
×
216

217
        required_bandwidth = request.bandwidth or 0
4✔
218
        required_latency = request.latency or 0
4✔
219
        request_id = request.id
4✔
220

221
        print(
4✔
222
            f"Setting required_latency: {required_latency}, "
223
            f"required_bandwidth: {required_bandwidth}"
224
        )
225

226
        request = ConnectionRequest(
4✔
227
            source=ingress_nodes[0],
228
            destination=egress_nodes[0],
229
            required_bandwidth=required_bandwidth,
230
            required_latency=required_latency,
231
        )
232

233
        return TrafficMatrix(connection_requests=[request], request_id=request_id)
4✔
234

235
    def generate_graph_te(self) -> nx.Graph:
4✔
236
        """
237
        Return the topology graph that we have.
238
        """
239
        graph = self.topology_manager.generate_graph()
4✔
240
        graph = nx.convert_node_labels_to_integers(graph, label_attribute="id")
4✔
241

242
        # TODO: why is this needed?
243
        self.graph = graph
4✔
244
        # print(list(graph.nodes(data=True)))
245

246
        return graph
4✔
247

248
    def graph_node_connectivity(self, source=None, dest=None):
4✔
249
        """
250
        Check that a source and destination node have connectivity.
251
        """
252
        # TODO: is this method really needed?
253
        return approx.node_connectivity(self.graph, source, dest)
4✔
254

255
    def requests_connectivity(self, tm: TrafficMatrix) -> bool:
4✔
256
        """
257
        Check that connectivity is possible.
258
        """
259
        # TODO: consider using filter() and reduce(), maybe?
260
        # TODO: write some tests for this method.
261
        for request in tm.connection_requests:
4✔
262
            conn = self.graph_node_connectivity(request.source, request.destination)
4✔
263
            print(
4✔
264
                f"Request connectivity: source {request.source}, destination: {request.destination} = {conn}"
265
            )
266
            if conn is False:
4✔
267
                return False
×
268

269
        return True
4✔
270

271
    def get_links_on_path(self, solution: ConnectionSolution) -> list:
4✔
272
        """
273
        Return all the links on a connection solution.
274

275
        The result will be a list of dicts, like so:
276

277
        .. code-block::
278

279
           [{'source': 'urn:ogf:network:sdx:port:zaoxi:A1:1',
280
              'destination': 'urn:ogf:network:sdx:port:zaoxi:B1:3'},
281
            {'source': 'urn:ogf:network:sdx:port:zaoxi:B1:1',
282
             'destination': 'urn:ogf:network:sdx:port:sax:B3:1'},
283
            {'source': 'urn:ogf:network:sdx:port:sax:B3:3',
284
             'destination': 'urn:ogf:network:sdx:port:sax:B1:4'},
285
            {'source': 'urn:ogf:network:sdx:port:sax:B1:1',
286
             'destination': 'urn:sdx:port:amlight:B1:1'},
287
            {'source': 'urn:sdx:port:amlight.net:B1:3',
288
             'destination': 'urn:sdx:port:amlight.net:A1:1'}]
289

290
        """
291
        if solution is None or solution.connection_map is None:
4✔
NEW
292
            print(f"Can't find paths for {solution}")
×
NEW
293
            return None
×
294

295
        result = []
4✔
296

297
        for domain, links in solution.connection_map.items():
4✔
298
            for link in links:
4!
299
                assert isinstance(link, ConnectionPath)
4✔
300

301
                src_node = self.graph.nodes.get(link.source)
4✔
302
                assert src_node is not None
4✔
303

304
                dst_node = self.graph.nodes.get(link.destination)
4✔
305
                assert dst_node is not None
4✔
306

307
                ports = self._get_ports_by_link(link)
4✔
308

309
                print(
4✔
310
                    f"get_links_on_path: src_node: {src_node} (#{link.source}), "
311
                    f"dst_node: {dst_node} (#{link.destination}), "
312
                    f"ports: {ports}"
313
                )
314

315
                if ports:
4✔
316
                    p1, p2 = ports
4✔
317
                    result.append({"source": p1.get("id"), "destination": p2.get("id")})
4✔
318

319
        return result
4✔
320

321
    def generate_connection_breakdown(self, solution: ConnectionSolution) -> dict:
4✔
322
        """
323
        Take a connection solution and generate a breakdown.
324

325
        This is an alternative to generate_connection_breakdown()
326
        below which uses the newly defined types from sdx_pce.models.
327
        """
328
        if solution is None or solution.connection_map is None:
4✔
329
            print(f"Can't find a breakdown for {solution}")
4✔
330
            return None
4✔
331

332
        breakdown = {}
4✔
333
        paths = solution.connection_map  # p2p for now
4✔
334

335
        for domain, links in paths.items():
4✔
336
            print(f"domain: {domain}, links: {links}")
4✔
337

338
            current_link_set = []
4✔
339

340
            for count, link in enumerate(links):
4✔
341
                print(f"count: {count}, link: {link}")
4✔
342

343
                assert isinstance(link, ConnectionPath)
4✔
344

345
                src_node = self.graph.nodes.get(link.source)
4✔
346
                assert src_node is not None
4✔
347

348
                dst_node = self.graph.nodes.get(link.destination)
4✔
349
                assert dst_node is not None
4✔
350

351
                print(f"source node: {src_node}, destination node: {dst_node}")
4✔
352

353
                src_domain = self.topology_manager.get_domain_name(src_node.get("id"))
4✔
354
                dst_domain = self.topology_manager.get_domain_name(dst_node.get("id"))
4✔
355

356
                # TODO: what do we do when a domain can't be
357
                # determined? Can a domain be `None`?
358
                print(f"source domain: {src_domain}, destination domain: {dst_domain}")
4✔
359

360
                current_link_set.append(link)
4✔
361
                current_domain = src_domain
4✔
362
                if src_domain == dst_domain:
4✔
363
                    # current_domain = domain_1
364
                    if count == len(links) - 1:
4✔
365
                        breakdown[current_domain] = current_link_set.copy()
4✔
366
                else:
367
                    breakdown[current_domain] = current_link_set.copy()
4✔
368
                    current_domain = None
4✔
369
                    current_link_set = []
4✔
370

371
        print(f"[intermediate] breakdown: {breakdown}")
4✔
372

373
        # now starting with the ingress_port
374
        first = True
4✔
375
        i = 0
4✔
376
        domain_breakdown = {}
4✔
377

378
        # TODO: using dict to represent a breakdown is dubious, and
379
        # may lead to incorrect results.  Dicts are lexically ordered,
380
        # and that may break some assumptions about the order in which
381
        # we form and traverse the breakdown.
382

383
        for domain, links in breakdown.items():
4✔
384
            print(f"Creating domain_breakdown: domain: {domain}, links: {links}")
4✔
385

386
            if first:
4✔
387
                first = False
4✔
388
                # ingress port for this domain is on the first link.
389
                ingress_port, _ = self._get_ports_by_link(links[0])
4✔
390
                # egress port for this domain is on the last link.
391
                egress_port, next_ingress_port = self._get_ports_by_link(links[-1])
4✔
392
            elif i == len(breakdown) - 1:
4✔
393
                ingress_port = next_ingress_port
4✔
394
                _, egress_port = self._get_ports_by_link(links[-1])
4✔
395
            else:
396
                ingress_port = next_ingress_port
4✔
397
                egress_port, next_ingress_port = self._get_ports_by_link(links[-1])
4✔
398

399
            segment = {}
4✔
400
            segment["ingress_port"] = ingress_port
4✔
401
            segment["egress_port"] = egress_port
4✔
402
            print(f"segment for {domain}: {segment}")
4✔
403

404
            domain_breakdown[domain] = segment.copy()
4✔
405
            i = i + 1
4✔
406

407
        print(f"generate_connection_breakdown(): domain_breakdown: {domain_breakdown}")
4✔
408

409
        tagged_breakdown = self._reserve_vlan_breakdown(
4✔
410
            domain_breakdown=domain_breakdown, request_id=solution.request_id
411
        )
412
        print(f"generate_connection_breakdown(): tagged_breakdown: {tagged_breakdown}")
4✔
413

414
        # Make tests pass, temporarily.
415
        if tagged_breakdown is None:
4✔
416
            return None
×
417

418
        assert isinstance(tagged_breakdown, VlanTaggedBreakdowns)
4✔
419

420
        # Return a dict containing VLAN-tagged breakdown in the
421
        # expected format.
422
        return tagged_breakdown.to_dict().get("breakdowns")
4✔
423

424
    def _get_ports_by_link(self, link: ConnectionPath):
4✔
425
        """
426
        Given a link, find the ports associated with it.
427

428
        Returns a (Port, Port) tuple.
429
        """
430
        assert isinstance(link, ConnectionPath)
4✔
431

432
        node1 = self.graph.nodes[link.source]["id"]
4✔
433
        node2 = self.graph.nodes[link.destination]["id"]
4✔
434

435
        print(f"_get_ports_by_link: node1: {node1}, node2: {node2}")
4✔
436

437
        ports = self.topology_manager.get_topology().get_port_by_link(node1, node2)
4✔
438

439
        # Avoid some possible crashes.
440
        if ports is None:
4✔
NEW
441
            return None
×
442

443
        n1, p1, n2, p2 = ports
4✔
444

445
        assert n1 == node1
4✔
446
        assert n2 == node2
4✔
447

448
        return p1, p2
4✔
449

450
    """
2✔
451
    functions for vlan reservation.
452

453
    Operations are:
454

455
        - obtain the available vlan lists
456

457
        - find the vlan continuity on a path if possible.
458

459
        - find the vlan translation on the multi-domain path if
460
          continuity not possible
461

462
        - reserve the vlan on all the ports on the path
463

464
        - unreserve the vlan when the path is removed
465
    """
466

467
    def _reserve_vlan_breakdown(
4✔
468
        self,
469
        domain_breakdown: dict,
470
        request_id: str,
471
    ) -> Optional[VlanTaggedBreakdowns]:
472
        """
473
        Upate domain breakdown with VLAN reservation information.
474

475
        This is the top-level function, to be called after
476
        _generate_connection_breakdown_tm(), and should be a private
477
        implementation detail.  It should be always called, meaning,
478
        the VLAN tags should be present in the final breakdown,
479
        regardless of whether the connection request explicitly asked
480
        for it or not.
481

482
        For this to work, TEManager should maintain a table of VLAN
483
        allocation from each of the domains.  The ones that are not in
484
        use can be reserved, and the ones that are not in use anymore
485
        should be returned to the pool by calling unreserve().
486

487
        :param domain_breakdown: per port available vlan range is
488
            pased in datamodel._parse_available_vlans(self, vlan_str)
489

490
        :return: Updated domain_breakdown with the VLAN assigned to
491
                 each port along a path, or None if failure.
492
        """
493

494
        # # Check if there exist a path of vlan continuity.  This is
495
        # # disabled for now, until the simple case is handled.
496
        # selected_vlan = self.find_vlan_on_path(domain_breakdown)
497
        # if selected_vlan is not None:
498
        #     return self._reserve_vlan_on_path(domain_breakdown, selected_vlan)
499

500
        # if not, assuming vlan translation on the domain border port
501

502
        print(f"reserve_vlan_breakdown: domain_breakdown: {domain_breakdown}")
4✔
503

504
        breakdowns = {}
4✔
505

506
        # upstream_o_vlan = ""
507
        for domain, segment in domain_breakdown.items():
4✔
508
            ingress_port = segment.get("ingress_port")
4✔
509
            egress_port = segment.get("egress_port")
4✔
510

511
            print(
4✔
512
                f"VLAN reservation: domain: {domain}, "
513
                f"ingress_port: {ingress_port}, egress_port: {egress_port}"
514
            )
515

516
            if ingress_port is None or egress_port is None:
4✔
UNCOV
517
                return None
×
518

519
            ingress_vlan = self._reserve_vlan(domain, ingress_port, request_id)
4✔
520
            egress_vlan = self._reserve_vlan(domain, egress_port, request_id)
4✔
521

522
            ingress_port_id = ingress_port.get("id")
4✔
523
            egress_port_id = egress_port.get("id")
4✔
524

525
            print(
4✔
526
                f"VLAN reservation: domain: {domain}, "
527
                f"ingress_vlan: {ingress_vlan}, egress_vlan: {egress_vlan}"
528
            )
529

530
            # if one has empty vlan range, first resume reserved vlans
531
            # in the previous domain, then return false.
532
            if egress_vlan is None:
4✔
UNCOV
533
                self._unreserve_vlan(ingress_vlan)
×
UNCOV
534
                return None
×
535

536
            if ingress_vlan is None:
4✔
UNCOV
537
                self._unreserve_vlan(egress_vlan)
×
UNCOV
538
                return None
×
539

540
            # # vlan translation from upstream_o_vlan to i_vlan
541
            # segment["ingress_upstream_vlan"] = upstream_o_vlan
542
            # segment["ingress_vlan"] = ingress_vlan
543
            # segment["egress_vlan"] = egress_vlan
544
            # upstream_o_vlan = egress_vlan
545

546
            port_a = VlanTaggedPort(
4✔
547
                VlanTag(value=ingress_vlan, tag_type=1), port_id=ingress_port_id
548
            )
549
            port_z = VlanTaggedPort(
4✔
550
                VlanTag(value=egress_vlan, tag_type=1), port_id=egress_port_id
551
            )
552

553
            # Names look like "AMLIGHT_vlan_201_202_Ampath_Tenet".  We
554
            # can form the initial part, but where did the
555
            # `Ampath_Tenet` at the end come from?
556
            domain_name = domain.split(":")[-1].split(".")[0].upper()
4✔
557
            name = f"{domain_name}_vlan_{ingress_vlan}_{egress_vlan}"
4✔
558

559
            breakdowns[domain] = VlanTaggedBreakdown(
4✔
560
                name=name,
561
                dynamic_backup_path=True,
562
                uni_a=port_a,
563
                uni_z=port_z,
564
            )
565

566
        return VlanTaggedBreakdowns(breakdowns=breakdowns)
4✔
567

568
    def _find_vlan_on_path(self, path):
4✔
569
        """
570
        Find an unused available VLAN on path.
571

572
        Finds a VLAN that's not being used at the moment on a provided
573
        path.  Returns an available VLAN if possible, None if none are
574
        available on the submitted path.
575

576
        output: vlan_tag string or None
577
        """
578

579
        # TODO: implement this
580
        # https://github.com/atlanticwave-sdx/pce/issues/126
581

UNCOV
582
        assert False, "Not implemented"
×
583

584
    def _reserve_vlan_on_path(self, domain_breakdown, selected_vlan):
4✔
585
        # TODO: what is the difference between reserve_vlan and
586
        # reserve_vlan_on_path?
587

588
        # TODO: implement this
589
        # https://github.com/atlanticwave-sdx/pce/issues/126
590

591
        # return domain_breakdown
UNCOV
592
        assert False, "Not implemented"
×
593

594
    def _reserve_vlan(self, domain: str, port: dict, request_id: str, tag=None):
4✔
595
        # with self._topology_lock:
596
        #     pass
597

598
        port_id = port.get("id")
4✔
599
        print(f"reserve_vlan domain: {domain} port_id: {port_id}")
4✔
600

601
        if port_id is None:
4✔
UNCOV
602
            return None
×
603

604
        # Look up available VLAN tags by domain and port ID.
605
        domain_table = self._vlan_tags_table.get(domain)
4✔
606

607
        if domain_table is None:
4✔
UNCOV
608
            print(f"reserve_vlan: Can't find domain: {domain} entry: {domain_table}")
×
UNCOV
609
            return None
×
610

611
        vlan_table = domain_table.get(port_id)
4✔
612

613
        print(f"reserve_vlan domain: {domain} vlan_table: {vlan_table}")
4✔
614

615
        # TODO: figure out when vlan_table can be None
616
        if vlan_table is None:
4✔
UNCOV
617
            print(f"Can't find a mapping for domain:{domain} port:{port_id}")
×
UNCOV
618
            return None
×
619

620
        available_tag = None
4✔
621

622
        if tag is None:
4✔
623
            # Find the first available VLAN tag from the table.
624
            for vlan_tag, vlan_usage in vlan_table.items():
4✔
625
                if vlan_usage is UNUSED_VLAN:
4✔
626
                    available_tag = vlan_tag
4✔
627
        else:
UNCOV
628
            if vlan_table[tag] is UNUSED_VLAN:
×
629
                available_tag = tag
×
630
            else:
UNCOV
631
                return None
×
632

633
        # mark the tag as in-use.
634
        vlan_table[available_tag] = request_id
4✔
635

636
        print(
4✔
637
            f"reserve_vlan domain {domain}, after reservation: "
638
            f"vlan_table: {vlan_table}, available_tag: {available_tag}"
639
        )
640

641
        return available_tag
4✔
642

643
    def unreserve_vlan(self, request_id: str):
4✔
644
        """
645
        Return previously reserved VLANs back to the pool.
646
        """
647
        for domain, port_table in self._vlan_tags_table.items():
4✔
648
            for port, vlan_table in port_table.items():
4✔
649
                for vlan, assignment in vlan_table.items():
4✔
650
                    if assignment == request_id:
4✔
651
                        vlan_table[vlan] = UNUSED_VLAN
4✔
652

653
    # to be called by delete_connection()
654
    def _unreserve_vlan_breakdown(self, break_down):
4✔
655
        # TODO: implement this.
656
        # https://github.com/atlanticwave-sdx/pce/issues/127
657
        # with self._topology_lock:
658
        #     pass
UNCOV
659
        assert False, "Not implemented"
×
660

661
    def _unreserve_vlan(self, domain: str, port: dict, tag=None):
4✔
662
        """
663
        Mark a VLAN tag as not in use.
664
        """
665
        # TODO: implement this.
666
        # https://github.com/atlanticwave-sdx/pce/issues/127
667

668
        # with self._topology_lock:
669
        #     pass
UNCOV
670
        assert False, "Not implemented"
×
671

672
    def _print_vlan_tags_table(self):
4✔
673
        import pprint
×
674

675
        print("------ VLAN TAGS TABLE -------")
×
UNCOV
676
        pprint.pprint(self._vlan_tags_table)
×
UNCOV
677
        print("------------------------------")
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc