• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

noironetworks / aci-containers / 8852

21 Mar 2024 03:12PM UTC coverage: 70.881% (-0.4%) from 71.311%
8852

Pull #1287

travis-pro

akhilamohanan
Do VLAN programming upfront for OpenShift on OpenStack

For OpenShift on OpenStack clusters, vnsRsCIfPathAtt is created for each
OpenStack compute hosts along with OpenShift nodes so that VLAN will
already be there when a vm is migrated from one compute host to other.

The change is done to avoid datapath issues after vm migration.
Pull Request #1287: Do VLAN programming upfront for OpenShift on OpenStack

27 of 120 new or added lines in 2 files covered. (22.5%)

175 existing lines in 4 files now uncovered.

10703 of 15100 relevant lines covered (70.88%)

0.8 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

68.42
/pkg/apicapi/apicapi.go
1
// Copyright 2017 Cisco Systems, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14

15
// Interface for connecting to APIC REST API using websockets
16
package apicapi
17

18
import (
19
        "bytes"
20
        "crypto/tls"
21
        "crypto/x509"
22
        "encoding/json"
23
        "errors"
24
        "fmt"
25
        "io"
26
        "net/http"
27
        "net/http/cookiejar"
28
        "regexp"
29
        "sort"
30
        "strconv"
31
        "strings"
32
        "time"
33

34
        "k8s.io/apimachinery/pkg/util/wait"
35
        "k8s.io/client-go/util/workqueue"
36

37
        "github.com/gorilla/websocket"
38
        "github.com/sirupsen/logrus"
39
        "golang.org/x/time/rate"
40
)
41

42
// defaultConnectionRefresh is used as connection refresh interval if
43
// RefreshInterval is set to 0
44
const defaultConnectionRefresh = 30 * time.Second
45

46
// ApicVersion - This global variable to be used when dealing with version-
47
// dependencies during APIC interaction. It gets filled with actual version
48
// as part of runConn()
49
var (
50
        ApicVersion = "3.1"
51
)
52

53
func complete(resp *http.Response) {
1✔
54
        if resp.StatusCode != http.StatusOK {
2✔
55
                rBody, err := io.ReadAll(resp.Body)
1✔
56
                if err != nil {
1✔
57
                        logrus.Errorf("ReadAll :%v", err)
×
58
                } else {
1✔
59
                        logrus.Infof("Resp: %s", rBody)
1✔
60
                }
1✔
61
        }
62
        resp.Body.Close()
1✔
63
}
64

65
// Yes, this is really stupid, but this is really how this works
66
func (conn *ApicConnection) sign(req *http.Request, uri string, body []byte) {
1✔
67
        if conn.signer == nil {
2✔
68
                return
1✔
69
        }
1✔
70

71
        sig, err := conn.signer.sign(req.Method, uri, body)
1✔
72
        if err != nil {
1✔
73
                conn.log.Error("Failed to sign request: ", err)
×
74
                return
×
75
        }
×
76

77
        req.Header.Set("Cookie", conn.apicSigCookie(sig, conn.token))
1✔
78
}
79

80
func (conn *ApicConnection) apicSigCookie(sig, token string) string {
1✔
81
        tokc := ""
1✔
82
        if token != "" {
2✔
83
                tokc = "; APIC-WebSocket-Session=" + token
1✔
84
        }
1✔
85
        return fmt.Sprintf("APIC-Request-Signature=%s; "+
1✔
86
                "APIC-Certificate-Algorithm=v1.0; "+
1✔
87
                "APIC-Certificate-DN=uni/userext/user-%s/usercert-%s.crt; "+
1✔
88
                "APIC-Certificate-Fingerprint=fingerprint%s",
1✔
89
                sig, conn.user, conn.user, tokc)
1✔
90
}
91

92
func (conn *ApicConnection) login() (string, error) {
1✔
93
        var path string
1✔
94
        var method string
1✔
95

1✔
96
        if conn.signer == nil {
2✔
97
                path = "aaaLogin"
1✔
98
                method = "POST"
1✔
99
        } else {
2✔
100
                path = "webtokenSession"
1✔
101
                method = "GET"
1✔
102
        }
1✔
103
        uri := fmt.Sprintf("/api/%s.json", path)
1✔
104
        url := fmt.Sprintf("https://%s%s", conn.Apic[conn.ApicIndex], uri)
1✔
105

1✔
106
        var reqBody io.Reader
1✔
107
        var raw []byte
1✔
108
        var err error
1✔
109
        if conn.signer == nil {
2✔
110
                login := &ApicObject{
1✔
111
                        "aaaUser": &ApicObjectBody{
1✔
112
                                Attributes: map[string]interface{}{
1✔
113
                                        "name": conn.user,
1✔
114
                                        "pwd":  conn.password,
1✔
115
                                },
1✔
116
                        },
1✔
117
                }
1✔
118
                raw, err = json.Marshal(login)
1✔
119
                if err != nil {
1✔
120
                        return "", err
×
121
                }
×
122
                reqBody = bytes.NewBuffer(raw)
1✔
123
        }
124
        req, err := http.NewRequest(method, url, reqBody)
1✔
125
        if err != nil {
1✔
126
                return "", err
×
127
        }
×
128
        conn.log.Infof("Req: %+v", req)
1✔
129
        conn.sign(req, uri, raw)
1✔
130
        req.Header.Set("Content-Type", "application/json")
1✔
131
        resp, err := conn.client.Do(req)
1✔
132
        if err != nil {
1✔
133
                return "", err
×
134
        }
×
135
        defer complete(resp)
1✔
136

1✔
137
        if resp.StatusCode < 200 || resp.StatusCode >= 300 {
2✔
138
                conn.logErrorResp("Error while logging into APIC", resp)
1✔
139
                return "", errors.New("Server returned error status")
1✔
140
        }
1✔
141

142
        var apicresp ApicResponse
1✔
143
        err = json.NewDecoder(resp.Body).Decode(&apicresp)
1✔
144
        if err != nil {
1✔
145
                return "", err
×
146
        }
×
147

148
        for _, obj := range apicresp.Imdata {
2✔
149
                lresp, ok := obj["aaaLogin"]
1✔
150
                if !ok {
2✔
151
                        lresp, ok = obj["webtokenSession"]
1✔
152
                        if !ok {
1✔
153
                                continue
×
154
                        }
155
                }
156

157
                token, ok := lresp.Attributes["token"]
1✔
158
                if !ok {
1✔
159
                        return "", errors.New("Token not found in login response")
×
160
                }
×
161
                stoken, isStr := token.(string)
1✔
162
                if !isStr {
1✔
163
                        return "", errors.New("Token is not a string")
×
164
                }
×
165
                return stoken, nil
1✔
166
        }
167
        return "", errors.New("Login response not found")
×
168
}
169

170
func configureTls(cert []byte) (*tls.Config, error) {
1✔
171
        if cert == nil {
1✔
172
                return &tls.Config{InsecureSkipVerify: true}, nil
×
173
        }
×
174
        pool := x509.NewCertPool()
1✔
175
        if !pool.AppendCertsFromPEM(cert) {
1✔
176
                return nil, errors.New("Could not load CA certificates")
×
177
        }
×
178
        return &tls.Config{RootCAs: pool}, nil
1✔
179
}
180

181
func New(log *logrus.Logger, apic []string, user string,
182
        password string, privKey []byte, cert []byte,
183
        prefix string, refresh int, refreshTickerAdjust int,
184
        subscriptionDelay int, vrfTenant string) (*ApicConnection, error) {
1✔
185
        tls, err := configureTls(cert)
1✔
186
        if err != nil {
1✔
187
                return nil, err
×
188
        }
×
189

190
        var signer *signer
1✔
191
        if privKey != nil {
2✔
192
                signer, err = newSigner(privKey)
1✔
193
                if err != nil {
1✔
194
                        return nil, err
×
195
                }
×
196
        }
197

198
        dialer := &websocket.Dialer{
1✔
199
                TLSClientConfig: tls,
1✔
200
        }
1✔
201
        tr := &http.Transport{
1✔
202
                Proxy:           http.ProxyFromEnvironment,
1✔
203
                TLSClientConfig: dialer.TLSClientConfig,
1✔
204
        }
1✔
205
        jar, err := cookiejar.New(nil)
1✔
206
        if err != nil {
1✔
207
                return nil, err
×
208
        }
×
209
        client := &http.Client{
1✔
210
                Transport: tr,
1✔
211
                Jar:       jar,
1✔
212
                Timeout:   5 * time.Minute,
1✔
213
        }
1✔
214

1✔
215
        conn := &ApicConnection{
1✔
216
                ReconnectInterval:   time.Duration(5) * time.Second,
1✔
217
                ReconnectRetryLimit: 5,
1✔
218
                RefreshInterval:     time.Duration(refresh) * time.Second,
1✔
219
                RefreshTickerAdjust: time.Duration(refreshTickerAdjust) * time.Second,
1✔
220
                SubscriptionDelay:   time.Duration(subscriptionDelay) * time.Millisecond,
1✔
221
                SyncDone:            false,
1✔
222
                signer:              signer,
1✔
223
                dialer:              dialer,
1✔
224
                logger:              log,
1✔
225
                log:                 log.WithField("mod", "APICAPI"),
1✔
226
                Apic:                apic,
1✔
227
                user:                user,
1✔
228
                password:            password,
1✔
229
                prefix:              prefix,
1✔
230
                client:              client,
1✔
231
                vrfTenant:           vrfTenant,
1✔
232
                subscriptions: subIndex{
1✔
233
                        subs: make(map[string]*subscription),
1✔
234
                        ids:  make(map[string]string),
1✔
235
                },
1✔
236
                desiredState:       make(map[string]ApicSlice),
1✔
237
                desiredStateDn:     make(map[string]ApicObject),
1✔
238
                keyHashes:          make(map[string]string),
1✔
239
                containerDns:       make(map[string]bool),
1✔
240
                cachedState:        make(map[string]ApicSlice),
1✔
241
                cacheDnSubIds:      make(map[string]map[string]bool),
1✔
242
                pendingSubDnUpdate: make(map[string]pendingChange),
1✔
243
                CachedSubnetDns:    make(map[string]string),
1✔
244
        }
1✔
245
        return conn, nil
1✔
246
}
247

248
func (conn *ApicConnection) handleSocketUpdate(apicresp *ApicResponse) {
1✔
249
        var subIds []string
1✔
250
        switch ids := apicresp.SubscriptionId.(type) {
1✔
251
        case string:
×
252
                subIds = append(subIds, ids)
×
253
        case []interface{}:
1✔
254
                for _, id := range ids {
2✔
255
                        subIds = append(subIds, id.(string))
1✔
256
                }
1✔
257
        }
258

259
        nameAttrClass := map[string]bool{"vnsLDevVip": true, "vnsAbsGraph": true, "vzFilter": true, "vzBrCP": true, "l3extInstP": true, "vnsSvcRedirectPol": true, "vnsRedirectHealthGroup": true, "fvIPSLAMonitoringPol": true}
1✔
260

1✔
261
        for _, obj := range apicresp.Imdata {
2✔
262
                for key, body := range obj {
2✔
263
                        if dn, ok := body.Attributes["dn"].(string); ok {
2✔
264
                                if status, isStr := body.Attributes["status"].(string); isStr {
2✔
265
                                        dnSlice := strings.Split(dn, "/")
1✔
266
                                        if len(dnSlice) > 1 && strings.Contains(dnSlice[1], conn.vrfTenant) {
2✔
267
                                                var attr string
1✔
268
                                                if nameAttrClass[key] {
1✔
269
                                                        _, ok := body.Attributes["name"]
×
270
                                                        if ok {
×
271
                                                                attr = body.Attributes["name"].(string)
×
272
                                                        }
×
273
                                                } else if key == "tagAnnotation" {
1✔
274
                                                        _, ok := body.Attributes["value"]
×
275
                                                        if ok {
×
276
                                                                attr = body.Attributes["value"].(string)
×
277
                                                        }
×
278
                                                }
279
                                                if attr != "" && !strings.Contains(attr, conn.prefix) {
1✔
280
                                                        conn.log.Debug("Skipping websocket notification for :", dn)
×
281
                                                        continue
×
282
                                                }
283
                                        }
284
                                        var pendingKind int
1✔
285
                                        if status == "deleted" {
2✔
286
                                                pendingKind = pendingChangeDelete
1✔
287
                                        } else {
2✔
288
                                                pendingKind = pendingChangeUpdate
1✔
289
                                        }
1✔
290
                                        conn.indexMutex.Lock()
1✔
291

1✔
292
                                        conn.logger.WithFields(logrus.Fields{
1✔
293
                                                "mod": "APICAPI",
1✔
294
                                                "dn":  obj.GetDn(),
1✔
295
                                                "obj": obj,
1✔
296
                                        }).Debug("Processing websocket notification for:")
1✔
297

1✔
298
                                        conn.pendingSubDnUpdate[dn] = pendingChange{
1✔
299
                                                kind:    pendingKind,
1✔
300
                                                subIds:  subIds,
1✔
301
                                                isDirty: false,
1✔
302
                                        }
1✔
303
                                        if key == "opflexODev" && conn.odevQueue != nil {
2✔
304
                                                conn.log.Debug("Adding dn to odevQueue: ", dn)
1✔
305
                                                conn.odevQueue.Add(dn)
1✔
306
                                        } else if isPriorityObject(dn) {
2✔
307
                                                conn.log.Debug("Adding dn to priorityQueue: ", dn)
×
308
                                                conn.priorityQueue.Add(dn)
×
309
                                        } else {
1✔
310
                                                if conn.deltaQueue != nil {
2✔
311
                                                        conn.deltaQueue.Add(dn)
1✔
312
                                                }
1✔
313
                                        }
314
                                        conn.indexMutex.Unlock()
1✔
315
                                }
316
                        }
317
                }
318
        }
319
}
320

321
func (conn *ApicConnection) restart() {
1✔
322
        conn.indexMutex.Lock()
1✔
323
        if conn.restartCh != nil {
2✔
324
                conn.log.Debug("Restarting connection")
1✔
325
                close(conn.restartCh)
1✔
326
                conn.restartCh = nil
1✔
327
        }
1✔
328
        conn.indexMutex.Unlock()
1✔
329
}
330

331
func (conn *ApicConnection) handleQueuedDn(dn string) bool {
1✔
332
        var respClasses []string
1✔
333
        var updateHandlers []ApicObjectHandler
1✔
334
        var deleteHandlers []ApicDnHandler
1✔
335
        var rootDn string
1✔
336

1✔
337
        handleId := func(id string) {
2✔
338
                conn.indexMutex.Lock()
1✔
339
                if value, ok := conn.subscriptions.ids[id]; ok {
2✔
340
                        if sub, ok := conn.subscriptions.subs[value]; ok {
2✔
341
                                if subComp, ok := sub.childSubs[id]; ok {
1✔
342
                                        respClasses =
×
343
                                                append(respClasses, subComp.respClasses...)
×
344
                                } else {
1✔
345
                                        respClasses =
1✔
346
                                                append(respClasses, sub.respClasses...)
1✔
347
                                }
1✔
348
                                if sub.updateHook != nil {
2✔
349
                                        updateHandlers = append(updateHandlers, sub.updateHook)
1✔
350
                                }
1✔
351
                                if sub.deleteHook != nil {
2✔
352
                                        deleteHandlers = append(deleteHandlers, sub.deleteHook)
1✔
353
                                }
1✔
354

355
                                if sub.kind == apicSubTree {
1✔
356
                                        rootDn = getRootDn(dn, value)
×
357
                                }
×
358
                        }
359
                } else {
×
360
                        conn.log.Warning("Unexpected subscription: ", id)
×
361
                }
×
362
                conn.indexMutex.Unlock()
1✔
363
        }
364

365
        var requeue bool
1✔
366
        conn.indexMutex.Lock()
1✔
367
        pending, hasPendingChange := conn.pendingSubDnUpdate[dn]
1✔
368
        conn.pendingSubDnUpdate[dn] = pendingChange{isDirty: true}
1✔
369
        obj, hasDesiredState := conn.desiredStateDn[dn]
1✔
370
        conn.indexMutex.Unlock()
1✔
371

1✔
372
        if hasPendingChange {
2✔
373
                for _, id := range pending.subIds {
2✔
374
                        handleId(id)
1✔
375
                }
1✔
376
        }
377

378
        if rootDn == "" {
2✔
379
                rootDn = dn
1✔
380
        }
1✔
381

382
        if hasDesiredState {
2✔
383
                if hasPendingChange {
2✔
384
                        if pending.kind == pendingChangeDelete {
2✔
385
                                conn.logger.WithFields(logrus.Fields{"mod": "APICAPI", "DN": dn}).
1✔
386
                                        Warning("Restoring unexpectedly deleted" +
1✔
387
                                                " ACI object")
1✔
388
                                requeue = conn.postDn(dn, obj)
1✔
389
                        } else {
2✔
390
                                conn.log.Debug("getSubtreeDn for:", rootDn)
1✔
391
                                conn.getSubtreeDn(rootDn, respClasses, updateHandlers)
1✔
392
                        }
1✔
393
                } else {
1✔
394
                        requeue = conn.postDn(dn, obj)
1✔
395
                }
1✔
396
        } else {
1✔
397
                if hasPendingChange {
2✔
398
                        if pending.kind == pendingChangeDelete {
2✔
399
                                for _, handler := range deleteHandlers {
2✔
400
                                        handler(dn)
1✔
401
                                }
1✔
402
                        }
403

404
                        if (pending.kind != pendingChangeDelete) || (dn != rootDn) {
2✔
405
                                conn.log.Debug("getSubtreeDn for:", rootDn)
1✔
406
                                conn.getSubtreeDn(rootDn, respClasses, updateHandlers)
1✔
407
                        }
1✔
408
                } else {
1✔
409
                        requeue = conn.Delete(dn)
1✔
410
                }
1✔
411
        }
412

413
        return requeue
1✔
414
}
415

416
func (conn *ApicConnection) processQueue(queue workqueue.RateLimitingInterface,
417
        queueStop <-chan struct{}, name string) {
1✔
418
        go wait.Until(func() {
2✔
419
                conn.log.Debug("Running processQueue for queue ", name)
1✔
420
                for {
2✔
421
                        dn, quit := queue.Get()
1✔
422
                        if quit {
2✔
423
                                break
1✔
424
                        }
425
                        conn.log.Debug("Processing queue for:", dn)
1✔
426
                        var requeue bool
1✔
427
                        if dn, ok := dn.(string); ok {
2✔
428
                                requeue = conn.handleQueuedDn(dn)
1✔
429
                        }
1✔
430
                        if requeue {
1✔
431
                                queue.AddRateLimited(dn)
×
432
                        } else {
1✔
433
                                conn.indexMutex.Lock()
1✔
434
                                if conn.pendingSubDnUpdate[dn.(string)].isDirty {
2✔
435
                                        delete(conn.pendingSubDnUpdate, dn.(string))
1✔
436
                                }
1✔
437
                                conn.indexMutex.Unlock()
1✔
438
                                queue.Forget(dn)
1✔
439
                        }
440
                        queue.Done(dn)
1✔
441
                }
442
        }, time.Second, queueStop)
443
        <-queueStop
1✔
444
        queue.ShutDown()
1✔
445
}
446

447
type fullSync struct{}
448

449
func (conn *ApicConnection) runConn(stopCh <-chan struct{}) {
1✔
450
        done := make(chan struct{})
1✔
451
        restart := make(chan struct{})
1✔
452
        queueStop := make(chan struct{})
1✔
453
        odevQueueStop := make(chan struct{})
1✔
454
        priorityQueueStop := make(chan struct{})
1✔
455
        syncHook := make(chan fullSync, 1)
1✔
456
        conn.restartCh = restart
1✔
457

1✔
458
        go func() {
2✔
459
                defer conn.connection.Close()
1✔
460
                defer close(done)
1✔
461

1✔
462
                for {
2✔
463
                        var apicresp ApicResponse
1✔
464
                        err := conn.connection.ReadJSON(&apicresp)
1✔
465
                        var closeErr *websocket.CloseError
1✔
466
                        if errors.As(err, &closeErr) {
1✔
467
                                conn.log.Info("Websocket connection closed: ", closeErr.Code)
×
468
                                conn.restart()
×
469
                                break
×
470
                        } else if err != nil {
2✔
471
                                conn.log.Error("Could not read web socket message:", err)
1✔
472
                                conn.restart()
1✔
473
                                break
1✔
474
                        } else {
1✔
475
                                conn.handleSocketUpdate(&apicresp)
1✔
476
                        }
1✔
477
                }
478
        }()
479

480
        conn.indexMutex.Lock()
1✔
481
        oldState := conn.cacheDnSubIds
1✔
482
        conn.cachedState = make(map[string]ApicSlice)
1✔
483
        conn.cacheDnSubIds = make(map[string]map[string]bool)
1✔
484
        conn.deltaQueue = workqueue.NewNamedRateLimitingQueue(
1✔
485
                workqueue.NewMaxOfRateLimiter(
1✔
486
                        workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond,
1✔
487
                                10*time.Second),
1✔
488
                        &workqueue.BucketRateLimiter{
1✔
489
                                Limiter: rate.NewLimiter(rate.Limit(10), int(100)),
1✔
490
                        },
1✔
491
                ),
1✔
492
                "delta")
1✔
493
        go conn.processQueue(conn.deltaQueue, queueStop, "delta")
1✔
494
        conn.odevQueue = workqueue.NewNamedRateLimitingQueue(
1✔
495
                workqueue.NewMaxOfRateLimiter(
1✔
496
                        workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond,
1✔
497
                                10*time.Second),
1✔
498
                        &workqueue.BucketRateLimiter{
1✔
499
                                Limiter: rate.NewLimiter(rate.Limit(10), int(100)),
1✔
500
                        },
1✔
501
                ),
1✔
502
                "odev")
1✔
503
        go conn.processQueue(conn.odevQueue, odevQueueStop, "odev")
1✔
504
        conn.priorityQueue = workqueue.NewNamedRateLimitingQueue(
1✔
505
                workqueue.NewMaxOfRateLimiter(
1✔
506
                        workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond,
1✔
507
                                10*time.Second),
1✔
508
                        &workqueue.BucketRateLimiter{
1✔
509
                                Limiter: rate.NewLimiter(rate.Limit(10), int(100)),
1✔
510
                        },
1✔
511
                ),
1✔
512
                "priority")
1✔
513
        go conn.processQueue(conn.priorityQueue, priorityQueueStop, "priority")
1✔
514
        conn.indexMutex.Unlock()
1✔
515

1✔
516
        refreshInterval := conn.RefreshInterval
1✔
517
        if refreshInterval == 0 {
1✔
518
                refreshInterval = defaultConnectionRefresh
×
519
        }
×
520
        // Adjust refreshTickerInterval.
521
        // To refresh the subscriptions early than actual refresh timeout value
522
        refreshTickerInterval := refreshInterval - conn.RefreshTickerAdjust
1✔
523
        refreshTicker := time.NewTicker(refreshTickerInterval)
1✔
524
        defer refreshTicker.Stop()
1✔
525

1✔
526
        var hasErr bool
1✔
527
        for value, subscription := range conn.subscriptions.subs {
2✔
528
                if !(conn.subscribe(value, subscription)) {
1✔
529
                        hasErr = true
×
530
                        conn.restart()
×
531
                        break
×
532
                }
533
        }
534
        if !hasErr {
2✔
535
                conn.checkDeletes(oldState)
1✔
536
                go func() {
2✔
537
                        if conn.FullSyncHook != nil {
1✔
538
                                conn.FullSyncHook()
×
539
                        }
×
540
                        syncHook <- fullSync{}
1✔
541
                }()
542
        }
543

544
        // Get APIC version if connection restarts
545
        if conn.version == "" && conn.checkVersion {
1✔
546
                go func() {
×
547
                        version, err := conn.GetVersion()
×
548
                        if err != nil {
×
549
                                conn.log.Error("Error while getting APIC version: ", err, " Restarting connection...")
×
550
                                conn.restart()
×
551
                        } else {
×
552
                                conn.log.Debug("Cached version:", conn.CachedVersion, " New version:", version)
×
553
                                ApicVersion = version
×
554
                                conn.CachedVersion = version
×
555
                        }
×
556
                }()
557
        }
558

559
        closeConn := func(stop bool) {
2✔
560
                close(queueStop)
1✔
561
                close(odevQueueStop)
1✔
562

1✔
563
                conn.indexMutex.Lock()
1✔
564
                conn.deltaQueue = nil
1✔
565
                conn.odevQueue = nil
1✔
566
                conn.priorityQueue = nil
1✔
567
                conn.stopped = stop
1✔
568
                conn.syncEnabled = false
1✔
569
                conn.subscriptions.ids = make(map[string]string)
1✔
570
                conn.version = ""
1✔
571
                conn.indexMutex.Unlock()
1✔
572

1✔
573
                conn.log.Debug("Shutting down web socket")
1✔
574
                err := conn.connection.WriteMessage(websocket.CloseMessage,
1✔
575
                        websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
1✔
576
                if err != nil {
1✔
577
                        conn.log.Error("Error while closing socket: ", err)
×
578
                } else {
1✔
579
                        select {
1✔
580
                        case <-done:
1✔
581
                        case <-time.After(time.Second):
×
582
                        }
583
                }
584
                conn.connection.Close()
1✔
585
        }
586

587
loop:
1✔
588
        for {
2✔
589
                select {
1✔
590
                case <-syncHook:
1✔
591
                        conn.fullSync()
1✔
592
                case <-refreshTicker.C:
1✔
593
                        conn.refresh()
1✔
594
                case <-restart:
1✔
595
                        closeConn(false)
1✔
596
                        break loop
1✔
597
                case <-stopCh:
1✔
598
                        closeConn(true)
1✔
599
                        break loop
1✔
600
                }
601
        }
602

603
        conn.log.Debug("Exiting websocket handler")
1✔
604
}
605

606
// This function should only to be called before we make the first connection to APIC.
607
// Use the cached Apic version when determining the version elsewhere. This can lead to inconsistent tokens.
608
func (conn *ApicConnection) GetVersion() (string, error) {
1✔
609
        versionMo := "firmwareCtrlrRunning"
1✔
610

1✔
611
        if len(conn.Apic) == 0 {
1✔
612
                return "", errors.New("No APIC configuration")
×
613
        }
×
614

615
        conn.checkVersion = true // enable version check on websocket reconnect
1✔
616
        // To Handle unit-tests
1✔
617
        if strings.Contains(conn.Apic[conn.ApicIndex], "127.0.0.1") {
2✔
618
                conn.version = "4.2(4i)"
1✔
619
                conn.SnatPbrFltrChain = true
1✔
620
                conn.log.Debug("Returning APIC version 4.2(4i) for test server")
1✔
621
                return conn.version, nil
1✔
622
        }
1✔
623

624
        uri := fmt.Sprintf("/api/node/class/%s.json?&", versionMo)
×
625
        url := fmt.Sprintf("https://%s%s", conn.Apic[conn.ApicIndex], uri)
×
626

×
627
        retries := 0
×
628
        for conn.version == "" {
×
629
                if retries <= conn.ReconnectRetryLimit {
×
630
                        // Wait before Retry.
×
631
                        time.Sleep(conn.ReconnectInterval)
×
632
                        retries++
×
633
                } else {
×
634
                        return "", fmt.Errorf("Failed to get APIC version after %d retries", retries)
×
635
                }
×
636

637
                token, err := conn.login()
×
638
                if err != nil {
×
639
                        conn.log.Error("Failed to log into APIC: ", err)
×
640
                        continue
×
641
                }
642
                conn.token = token
×
643

×
644
                req, err := http.NewRequest("GET", url, http.NoBody)
×
645
                if err != nil {
×
646
                        conn.log.Error("Could not create request:", err)
×
647
                        continue
×
648
                }
649
                conn.sign(req, uri, nil)
×
650
                resp, err := conn.client.Do(req)
×
651
                if err != nil {
×
652
                        conn.log.Error("Could not get response for ", versionMo, ": ", err)
×
653
                        continue
×
654
                }
655
                defer complete(resp)
×
656
                if resp.StatusCode < 200 || resp.StatusCode >= 300 {
×
657
                        conn.logErrorResp("Could not get response for "+versionMo, resp)
×
658
                        conn.log.Debug("Request:", req)
×
659
                        continue
×
660
                }
661

662
                var apicresp ApicResponse
×
663
                err = json.NewDecoder(resp.Body).Decode(&apicresp)
×
664
                if err != nil {
×
665
                        conn.log.Error("Could not parse APIC response: ", err)
×
666
                        continue
×
667
                }
668
                for _, obj := range apicresp.Imdata {
×
669
                        vresp := obj["firmwareCtrlrRunning"]
×
670
                        version, ok := vresp.Attributes["version"]
×
671
                        if !ok {
×
672
                                conn.log.Debug("No version attribute in the response??!")
×
673
                                conn.logger.WithFields(logrus.Fields{
×
674
                                        "mod":                            "APICAPI",
×
675
                                        "firmwareCtrlrRunning":           vresp,
×
676
                                        "firmwareCtrlRunning Attributes": vresp.Attributes,
×
677
                                }).Debug("Response:")
×
678
                        } else {
×
679
                                switch version := version.(type) {
×
680
                                default:
×
681
                                case string:
×
682
                                        version_split := strings.Split(version, "(")
×
683
                                        version_number, err := strconv.ParseFloat(version_split[0], 64)
×
684
                                        conn.log.Info("Actual APIC version:", version, " Stripped out version:", version_number)
×
685
                                        if err == nil {
×
686
                                                conn.version = version //return the actual version
×
687
                                        }
×
688
                                }
689
                        }
690
                }
691
        }
692
        return conn.version, nil
×
693
}
694

695
func (conn *ApicConnection) Run(stopCh <-chan struct{}) {
1✔
696
        if len(conn.Apic) == 0 {
1✔
697
                conn.log.Warning("APIC connection not configured")
×
698
                return
×
699
        }
×
700

701
        if conn.version >= "6.0(4c)" {
1✔
702
                metadata["fvBD"].attributes["serviceBdRoutingDisable"] = "no"
×
703
        }
×
704

705
        for !conn.stopped {
2✔
706
                func() {
2✔
707
                        defer func() {
2✔
708
                                conn.ApicIndex = (conn.ApicIndex + 1) % len(conn.Apic)
1✔
709
                                time.Sleep(conn.ReconnectInterval)
1✔
710
                        }()
1✔
711

712
                        conn.logger.WithFields(logrus.Fields{
1✔
713
                                "mod":  "APICAPI",
1✔
714
                                "host": conn.Apic[conn.ApicIndex],
1✔
715
                        }).Info("Connecting to APIC")
1✔
716

1✔
717
                        for dn := range conn.subscriptions.subs {
2✔
718
                                conn.subscriptions.subs[dn].childSubs = make(map[string]subComponent)
1✔
719
                        }
1✔
720
                        conn.subscriptions.ids = make(map[string]string)
1✔
721

1✔
722
                        token, err := conn.login()
1✔
723
                        if err != nil {
2✔
724
                                conn.log.Error("Failed to log into APIC: ", err)
1✔
725
                                return
1✔
726
                        }
1✔
727
                        conn.token = token
1✔
728

1✔
729
                        uri := fmt.Sprintf("/socket%s", token)
1✔
730
                        url := fmt.Sprintf("wss://%s%s",
1✔
731
                                conn.Apic[conn.ApicIndex], uri)
1✔
732
                        header := make(http.Header)
1✔
733
                        if conn.signer != nil {
2✔
734
                                sig, err := conn.signer.sign("GET", uri, nil)
1✔
735
                                if err != nil {
1✔
736
                                        conn.log.Error("Failed to sign request: ", err)
×
737
                                        return
×
738
                                }
×
739
                                header.Set("Cookie", conn.apicSigCookie(sig, token))
1✔
740
                        }
741

742
                        conn.connection, _, err = conn.dialer.Dial(url, header)
1✔
743
                        if err != nil {
1✔
744
                                conn.log.Error("Failed to open APIC websocket: ", err)
×
745
                                return
×
746
                        }
×
747
                        conn.log.Info("Websocket connected!")
1✔
748
                        conn.runConn(stopCh)
1✔
749
                }()
750
        }
751
}
752

753
func (conn *ApicConnection) refresh() {
1✔
754
        if conn.signer == nil {
2✔
755
                url := fmt.Sprintf("https://%s/api/aaaRefresh.json",
1✔
756
                        conn.Apic[conn.ApicIndex])
1✔
757
                req, err := http.NewRequest("GET", url, http.NoBody)
1✔
758
                if err != nil {
1✔
759
                        conn.log.Error("Could not create request: ", err)
×
760
                        return
×
761
                }
×
762
                resp, err := conn.client.Do(req)
1✔
763
                if err != nil {
1✔
UNCOV
764
                        conn.log.Error("Failed to refresh APIC session: ", err)
×
UNCOV
765
                        conn.restart()
×
UNCOV
766
                        return
×
UNCOV
767
                }
×
768
                if resp.StatusCode < 200 || resp.StatusCode >= 300 {
2✔
769
                        conn.logErrorResp("Error while refreshing login", resp)
1✔
770
                        complete(resp)
1✔
771
                        conn.restart()
1✔
772
                        return
1✔
773
                }
1✔
774
                complete(resp)
1✔
775
                conn.log.Debugf("Refresh: url %v", url)
1✔
776
        }
777

778
        for _, sub := range conn.subscriptions.subs {
2✔
779
                refreshId := func(id string) {
2✔
780
                        uri := fmt.Sprintf("/api/subscriptionRefresh.json?id=%s", id)
1✔
781
                        url := fmt.Sprintf("https://%s%s", conn.Apic[conn.ApicIndex], uri)
1✔
782
                        req, err := http.NewRequest("GET", url, http.NoBody)
1✔
783
                        if err != nil {
1✔
784
                                conn.log.Error("Could not create request: ", err)
×
785
                                return
×
786
                        }
×
787
                        conn.sign(req, uri, nil)
1✔
788
                        resp, err := conn.client.Do(req)
1✔
789
                        if err != nil {
2✔
790
                                conn.log.Error("Failed to refresh APIC subscription: ", err)
1✔
791
                                conn.restart()
1✔
792
                                return
1✔
793
                        }
1✔
794
                        if resp.StatusCode < 200 || resp.StatusCode >= 300 {
2✔
795
                                conn.logErrorResp("Error while refreshing subscription", resp)
1✔
796
                                complete(resp)
1✔
797
                                conn.restart()
1✔
798
                                return
1✔
799
                        }
1✔
800
                        complete(resp)
1✔
801
                        conn.log.Debugf("Refresh sub: url %v", url)
1✔
802
                        time.Sleep(conn.SubscriptionDelay)
1✔
803
                }
804
                if len(sub.childSubs) > 0 {
1✔
805
                        for id := range sub.childSubs {
×
806
                                refreshId(id)
×
807
                        }
×
808
                } else {
1✔
809
                        refreshId(sub.id)
1✔
810
                }
1✔
811
        }
812
}
813

814
func (conn *ApicConnection) logErrorResp(message string, resp *http.Response) {
1✔
815
        var apicresp ApicResponse
1✔
816
        err := json.NewDecoder(resp.Body).Decode(&apicresp)
1✔
817
        if err != nil {
1✔
818
                conn.log.Error("Could not parse APIC error response: ", err)
×
819
        } else {
1✔
820
                code := 0
1✔
821
                text := ""
1✔
822
                for _, o := range apicresp.Imdata {
2✔
823
                        if ob, ok := o["error"]; ok {
2✔
824
                                if ob.Attributes != nil {
2✔
825
                                        if t, isStr := ob.Attributes["text"].(string); isStr {
2✔
826
                                                text = t
1✔
827
                                        }
1✔
828
                                        if c, isInt := ob.Attributes["code"].(int); isInt {
1✔
829
                                                code = c
×
830
                                        }
×
831
                                }
832
                        }
833
                }
834
                conn.logger.WithFields(logrus.Fields{
1✔
835
                        "mod":    "APICAPI",
1✔
836
                        "text":   text,
1✔
837
                        "code":   code,
1✔
838
                        "url":    resp.Request.URL,
1✔
839
                        "status": resp.StatusCode,
1✔
840
                }).Error(message)
1✔
841
        }
842
}
843

844
// To make sure cluster's POD/NodeBDs and L3OUT are all mapped
845
// to same and correct VRF.
846
func (conn *ApicConnection) ValidateAciVrfAssociation(acivrfdn string, expectedVrfRelations []string) error {
×
847
        var aciVrfBdL3OuttDns []string
×
848
        args := []string{
×
849
                "query-target=subtree",
×
850
                "target-subtree-class=fvRtCtx,fvRtEctx",
×
851
        }
×
852

×
853
        uri := fmt.Sprintf("/api/mo/%s.json?%s", acivrfdn, strings.Join(args, "&"))
×
854
        url := fmt.Sprintf("https://%s%s", conn.Apic[conn.ApicIndex], uri)
×
855
        req, err := http.NewRequest("GET", url, http.NoBody)
×
856
        if err != nil {
×
857
                conn.log.Error("Could not create request: ", err)
×
858
                return err
×
859
        }
×
860
        conn.sign(req, uri, nil)
×
861
        resp, err := conn.client.Do(req)
×
862
        if err != nil {
×
863
                conn.log.Error("Could not get subtree for ", acivrfdn, ": ", err)
×
864
                return err
×
865
        }
×
866
        defer complete(resp)
×
867
        if resp.StatusCode < 200 || resp.StatusCode >= 300 {
×
868
                conn.logErrorResp("Could not get subtree for "+acivrfdn, resp)
×
869
                return err
×
870
        }
×
871

872
        var apicresp ApicResponse
×
873
        err = json.NewDecoder(resp.Body).Decode(&apicresp)
×
874
        if err != nil {
×
875
                conn.log.Error("Could not parse APIC response: ", err)
×
876
                return err
×
877
        }
×
878

879
        for _, obj := range apicresp.Imdata {
×
880
                for _, body := range obj {
×
881
                        tDn, ok := body.Attributes["tDn"].(string)
×
882
                        if !ok {
×
883
                                continue
×
884
                        }
885
                        aciVrfBdL3OuttDns = append(aciVrfBdL3OuttDns, tDn)
×
886
                }
887
        }
888
        sort.Strings(aciVrfBdL3OuttDns)
×
889
        conn.log.Debug("aciVrfBdL3OuttDns:", aciVrfBdL3OuttDns)
×
890
        for _, expectedDn := range expectedVrfRelations {
×
891
                i := sort.SearchStrings(aciVrfBdL3OuttDns, expectedDn)
×
892
                if !(i < len(aciVrfBdL3OuttDns) && aciVrfBdL3OuttDns[i] == expectedDn) {
×
893
                        conn.log.Debug("Missing (or) Incorrect Vrf association: ", expectedDn)
×
894
                        return errors.New("Incorrect Pod/NodeBD/L3OUT VRF association")
×
895
                }
×
896
        }
897
        return nil
×
898
}
899

900
func (conn *ApicConnection) getSubtreeDn(dn string, respClasses []string,
901
        updateHandlers []ApicObjectHandler) {
1✔
902
        args := []string{
1✔
903
                "rsp-subtree=full",
1✔
904
        }
1✔
905

1✔
906
        if len(respClasses) > 0 {
2✔
907
                args = append(args, "rsp-subtree-class="+strings.Join(respClasses, ","))
1✔
908
        }
1✔
909
        // properly encoding the URI query parameters breaks APIC
910
        uri := fmt.Sprintf("/api/mo/%s.json?%s", dn, strings.Join(args, "&"))
1✔
911
        url := fmt.Sprintf("https://%s%s", conn.Apic[conn.ApicIndex], uri)
1✔
912
        conn.log.Debugf("URL: %v", url)
1✔
913
        req, err := http.NewRequest("GET", url, http.NoBody)
1✔
914
        if err != nil {
1✔
915
                conn.log.Error("Could not create request: ", err)
×
916
                return
×
917
        }
×
918
        conn.sign(req, uri, nil)
1✔
919
        resp, err := conn.client.Do(req)
1✔
920
        if err != nil {
1✔
921
                conn.log.Error("Could not get subtree for ", dn, ": ", err)
×
922
                conn.restart()
×
923
                return
×
924
        }
×
925
        defer complete(resp)
1✔
926
        if resp.StatusCode < 200 || resp.StatusCode >= 300 {
1✔
927
                conn.logErrorResp("Could not get subtree for "+dn, resp)
×
928
                conn.restart()
×
929
                return
×
930
        }
×
931

932
        var apicresp ApicResponse
1✔
933
        err = json.NewDecoder(resp.Body).Decode(&apicresp)
1✔
934
        if err != nil {
1✔
935
                conn.log.Error("Could not parse APIC response: ", err)
×
936
                return
×
937
        }
×
938
        if len(apicresp.Imdata) == 0 {
1✔
939
                conn.log.Debugf("No subtree found for dn %s", dn)
×
940
        }
×
941

942
        for _, obj := range apicresp.Imdata {
2✔
943
                conn.logger.WithFields(logrus.Fields{
1✔
944
                        "mod": "APICAPI",
1✔
945
                        "dn":  obj.GetDn(),
1✔
946
                        "obj": obj,
1✔
947
                }).Debug("Object updated on APIC")
1✔
948
                var count int
1✔
949
                prepareApicCache("", obj, &count)
1✔
950

1✔
951
                handled := false
1✔
952
                for _, handler := range updateHandlers {
1✔
953
                        if handler(obj) {
×
954
                                handled = true
×
955
                                break
×
956
                        }
957
                }
958
                if handled {
1✔
959
                        continue
×
960
                }
961
                conn.reconcileApicObject(obj)
1✔
962
        }
963
}
964

965
func (conn *ApicConnection) queuePriorityDn(dn string) {
×
966
        conn.indexMutex.Lock()
×
967
        if conn.priorityQueue != nil {
×
968
                conn.priorityQueue.Add(dn)
×
969
        }
×
970
        conn.indexMutex.Unlock()
×
971
}
972

973
func (conn *ApicConnection) queueDn(dn string) {
1✔
974
        conn.indexMutex.Lock()
1✔
975
        if conn.deltaQueue != nil {
2✔
976
                conn.deltaQueue.Add(dn)
1✔
977
        }
1✔
978
        conn.indexMutex.Unlock()
1✔
979
}
980

981
func (conn *ApicConnection) ForceRelogin() {
×
982
        conn.token = ""
×
983
}
×
984

985
func (conn *ApicConnection) PostTestAPI(data interface{}) error {
×
986
        if conn.token == "" {
×
987
                token, err := conn.login()
×
988
                if err != nil {
×
989
                        conn.log.Errorf("Login: %v", err)
×
990
                        return err
×
991
                }
×
992
                conn.token = token
×
993
        }
994
        uri := "/testapi/cloudpe/mo/.json"
×
995
        url := fmt.Sprintf("https://%s%s", conn.Apic[conn.ApicIndex], uri)
×
996
        raw, err := json.Marshal(data)
×
997
        if err != nil {
×
998
                conn.log.Errorf("Could not serialize object for testapi %v", err)
×
999
                return err
×
1000
        }
×
1001
        req, err := http.NewRequest("POST", url, bytes.NewBuffer(raw))
×
1002
        if err != nil {
×
1003
                conn.log.Error("Could not create request: ", err)
×
1004
                return err
×
1005
        }
×
1006
        conn.sign(req, uri, raw)
×
1007
        req.Header.Set("Content-Type", "application/json")
×
1008
        conn.log.Infof("Post: %+v", req)
×
1009
        resp, err := conn.client.Do(req)
×
1010
        if err != nil {
×
1011
                conn.log.Errorf("Could not update dn %v", err)
×
1012
                return err
×
1013
        }
×
1014

1015
        complete(resp)
×
1016
        if resp.StatusCode != http.StatusOK {
×
1017
                return fmt.Errorf("Status: %v", resp.StatusCode)
×
1018
        }
×
1019
        return nil
×
1020
}
1021

1022
func (conn *ApicConnection) PostDnInline(dn string, obj ApicObject) error {
×
1023
        conn.logger.WithFields(logrus.Fields{
×
1024
                "mod": "APICAPI",
×
1025
                "dn":  dn,
×
1026
                "obj": obj,
×
1027
        }).Debug("Posting Dn Inline")
×
1028
        if conn.token == "" {
×
1029
                token, err := conn.login()
×
1030
                if err != nil {
×
1031
                        conn.log.Errorf("Login: %v", err)
×
1032
                        return err
×
1033
                }
×
1034
                conn.token = token
×
1035
        }
1036
        uri := fmt.Sprintf("/api/mo/%s.json", dn)
×
1037
        url := fmt.Sprintf("https://%s%s", conn.Apic[conn.ApicIndex], uri)
×
1038
        raw, err := json.Marshal(obj)
×
1039
        if err != nil {
×
1040
                conn.log.Error("Could not serialize object for dn ", dn, ": ", err)
×
1041
                return err
×
1042
        }
×
1043
        req, err := http.NewRequest("POST", url, bytes.NewBuffer(raw))
×
1044
        if err != nil {
×
1045
                conn.log.Error("Could not create request: ", err)
×
1046
                return err
×
1047
        }
×
1048
        conn.sign(req, uri, raw)
×
1049
        req.Header.Set("Content-Type", "application/json")
×
1050
        conn.log.Infof("Post: %+v", req)
×
1051
        resp, err := conn.client.Do(req)
×
1052
        if err != nil {
×
1053
                conn.log.Error("Could not update dn ", dn, ": ", err)
×
1054
                return err
×
1055
        }
×
1056

1057
        complete(resp)
×
1058
        if resp.StatusCode != http.StatusOK {
×
1059
                return fmt.Errorf("Status: %v", resp.StatusCode)
×
1060
        }
×
1061
        return nil
×
1062
}
1063

1064
func (conn *ApicConnection) DeleteDnInline(dn string) error {
1✔
1065
        conn.logger.WithFields(logrus.Fields{
1✔
1066
                "mod": "APICAPI",
1✔
1067
                "dn":  dn,
1✔
1068
        }).Debug("Deleting Dn Inline")
1✔
1069
        uri := fmt.Sprintf("/api/mo/%s.json", dn)
1✔
1070
        url := fmt.Sprintf("https://%s%s", conn.Apic[conn.ApicIndex], uri)
1✔
1071
        req, err := http.NewRequest("DELETE", url, http.NoBody)
1✔
1072
        if err != nil {
1✔
1073
                conn.log.Error("Could not create delete request: ", err)
×
1074
                return err
×
1075
        }
×
1076
        conn.sign(req, uri, nil)
1✔
1077
        resp, err := conn.client.Do(req)
1✔
1078
        if err != nil {
2✔
1079
                conn.log.Error("Could not delete dn ", dn, ": ", err)
1✔
1080
                return err
1✔
1081
        }
1✔
1082
        defer complete(resp)
×
1083
        return nil
×
1084
}
1085

1086
func (conn *ApicConnection) postDn(dn string, obj ApicObject) bool {
1✔
1087
        conn.logger.WithFields(logrus.Fields{
1✔
1088
                "mod": "APICAPI",
1✔
1089
                "dn":  dn,
1✔
1090
                "obj": obj,
1✔
1091
        }).Debug("Posting Dn")
1✔
1092

1✔
1093
        uri := fmt.Sprintf("/api/mo/%s.json", dn)
1✔
1094
        url := fmt.Sprintf("https://%s%s", conn.Apic[conn.ApicIndex], uri)
1✔
1095
        raw, err := json.Marshal(obj)
1✔
1096
        if err != nil {
1✔
1097
                conn.log.Error("Could not serialize object for dn ", dn, ": ", err)
×
1098
        }
×
1099
        req, err := http.NewRequest("POST", url, bytes.NewBuffer(raw))
1✔
1100
        if err != nil {
1✔
1101
                conn.log.Error("Could not create request: ", err)
×
1102
                conn.restart()
×
1103
                return false
×
1104
        }
×
1105
        conn.sign(req, uri, raw)
1✔
1106
        req.Header.Set("Content-Type", "application/json")
1✔
1107
        resp, err := conn.client.Do(req)
1✔
1108
        if err != nil {
1✔
1109
                conn.log.Error("Could not update dn ", dn, ": ", err)
×
1110
                conn.restart()
×
1111
                return false
×
1112
        }
×
1113
        defer complete(resp)
1✔
1114
        if resp.StatusCode < 200 || resp.StatusCode >= 300 {
1✔
1115
                conn.logErrorResp("Could not update dn "+dn, resp)
×
1116
                if resp.StatusCode == 400 {
×
1117
                        return true
×
1118
                }
×
1119
                conn.restart()
×
1120
        }
1121
        return false
1✔
1122
}
1123

1124
func (conn *ApicConnection) Delete(dn string) bool {
1✔
1125
        if dn == "" {
1✔
1126
                conn.log.Debug("Skip delete for empty Dn: ")
×
1127
                return false
×
1128
        }
×
1129
        dnSlice := strings.Split(dn, "/")
1✔
1130
        identifier := dnSlice[len(dnSlice)-1]
1✔
1131
        iSlice := strings.SplitN(identifier, "-", 2)
1✔
1132
        if len(iSlice) == 2 {
2✔
1133
                if iSlice[0] == "ip" {
1✔
1134
                        addr := strings.Trim(iSlice[1], "[]")
×
1135
                        obj := NewDeleteHostprotRemoteIp(addr)
×
1136
                        conn.log.Debug("Posting delete of dn ", dn)
×
1137
                        return conn.postDn(dn, obj)
×
1138
                }
×
1139
        }
1140
        return conn.DeleteDn(dn)
1✔
1141
}
1142

1143
func (conn *ApicConnection) DeleteDn(dn string) bool {
1✔
1144
        if dn == "" {
1✔
1145
                conn.log.Debug("Skip delete for empty Dn: ")
×
1146
                return false
×
1147
        }
×
1148
        conn.logger.WithFields(logrus.Fields{
1✔
1149
                "mod": "APICAPI",
1✔
1150
                "dn":  dn,
1✔
1151
        }).Debug("Deleting Dn")
1✔
1152
        uri := fmt.Sprintf("/api/mo/%s.json", dn)
1✔
1153
        url := fmt.Sprintf("https://%s%s", conn.Apic[conn.ApicIndex], uri)
1✔
1154
        req, err := http.NewRequest("DELETE", url, http.NoBody)
1✔
1155
        if err != nil {
1✔
1156
                conn.log.Error("Could not create delete request: ", err)
×
1157
                conn.restart()
×
1158
                return false
×
1159
        }
×
1160
        conn.sign(req, uri, nil)
1✔
1161
        resp, err := conn.client.Do(req)
1✔
1162
        if err != nil {
1✔
1163
                conn.log.Error("Could not delete dn ", dn, ": ", err)
×
1164
                conn.restart()
×
1165
                return false
×
1166
        }
×
1167
        defer complete(resp)
1✔
1168
        if resp.StatusCode < 200 || resp.StatusCode >= 300 {
1✔
1169
                conn.logErrorResp("Could not delete dn "+dn, resp)
×
1170
                conn.restart()
×
1171
        }
×
1172
        return false
1✔
1173
}
1174

1175
func doComputeRespClasses(targetClasses []string,
1176
        visited map[string]bool) {
1✔
1177
        for _, class := range targetClasses {
2✔
1178
                if visited[class] {
1✔
1179
                        continue
×
1180
                }
1181
                visited[class] = true
1✔
1182
                if md, ok := metadata[class]; ok {
2✔
1183
                        doComputeRespClasses(md.children, visited)
1✔
1184
                }
1✔
1185
        }
1186
}
1187

1188
func computeRespClasses(targetClasses []string) []string {
1✔
1189
        visited := make(map[string]bool)
1✔
1190
        doComputeRespClasses(targetClasses, visited)
1✔
1191

1✔
1192
        // Don't include targetclasses in rsp-subtree
1✔
1193
        // because they are implicitly included
1✔
1194
        for i := range targetClasses {
2✔
1195
                delete(visited, targetClasses[i])
1✔
1196
        }
1✔
1197

1198
        var respClasses []string
1✔
1199
        for class := range visited {
2✔
1200
                respClasses = append(respClasses, class)
1✔
1201
        }
1✔
1202
        respClasses = append(respClasses, "tagAnnotation")
1✔
1203
        return respClasses
1✔
1204
}
1205

1206
// AddSubscriptionTree subscribe at a subtree level. class specifies
1207
// the root. Changes will cause entire subtree of the rootdn to be fetched
1208
func (conn *ApicConnection) AddSubscriptionTree(class string,
1209
        targetClasses []string, targetFilter string) {
1✔
1210
        if _, ok := classDepth[class]; !ok {
1✔
1211
                errStr := fmt.Sprintf("classDepth not for class %s", class)
×
1212
                panic(errStr)
×
1213
        }
1214

1215
        conn.indexMutex.Lock()
1✔
1216
        conn.subscriptions.subs[class] = &subscription{
1✔
1217
                kind:          apicSubTree,
1✔
1218
                childSubs:     make(map[string]subComponent),
1✔
1219
                targetClasses: targetClasses,
1✔
1220
                targetFilter:  targetFilter,
1✔
1221
        }
1✔
1222
        conn.indexMutex.Unlock()
1✔
1223
}
1224

1225
func (conn *ApicConnection) AddSubscriptionClass(class string,
1226
        targetClasses []string, targetFilter string) {
1✔
1227
        conn.indexMutex.Lock()
1✔
1228
        conn.subscriptions.subs[class] = &subscription{
1✔
1229
                kind:          apicSubClass,
1✔
1230
                childSubs:     make(map[string]subComponent),
1✔
1231
                targetClasses: targetClasses,
1✔
1232
                respClasses:   computeRespClasses(targetClasses),
1✔
1233
                targetFilter:  targetFilter,
1✔
1234
        }
1✔
1235
        conn.indexMutex.Unlock()
1✔
1236
}
1✔
1237

1238
func (conn *ApicConnection) AddSubscriptionDn(dn string,
1239
        targetClasses []string) {
1✔
1240
        conn.logger.WithFields(logrus.Fields{
1✔
1241
                "mod": "APICAPI",
1✔
1242
                "dn":  dn,
1✔
1243
        }).Debug("Adding Subscription for Dn")
1✔
1244

1✔
1245
        conn.indexMutex.Lock()
1✔
1246
        conn.subscriptions.subs[dn] = &subscription{
1✔
1247
                kind:          apicSubDn,
1✔
1248
                childSubs:     make(map[string]subComponent),
1✔
1249
                targetClasses: targetClasses,
1✔
1250
                respClasses:   computeRespClasses(targetClasses),
1✔
1251
        }
1✔
1252
        conn.indexMutex.Unlock()
1✔
1253
}
1✔
1254

1255
func (conn *ApicConnection) SetSubscriptionHooks(value string,
1256
        updateHook ApicObjectHandler, deleteHook ApicDnHandler) {
1✔
1257
        conn.indexMutex.Lock()
1✔
1258
        if s, ok := conn.subscriptions.subs[value]; ok {
2✔
1259
                s.updateHook = updateHook
1✔
1260
                s.deleteHook = deleteHook
1✔
1261
        }
1✔
1262
        conn.indexMutex.Unlock()
1✔
1263
}
1264

1265
func (conn *ApicConnection) GetApicResponse(uri string) (ApicResponse, error) {
1✔
1266
        conn.log.Debug("apicIndex: ", conn.Apic[conn.ApicIndex], " uri: ", uri)
1✔
1267
        url := fmt.Sprintf("https://%s%s", conn.Apic[conn.ApicIndex], uri)
1✔
1268
        var apicresp ApicResponse
1✔
1269
        conn.log.Debug("Apic Get url: ", url)
1✔
1270
        req, err := http.NewRequest("GET", url, http.NoBody)
1✔
1271
        if err != nil {
1✔
1272
                conn.log.Error("Could not create request: ", err)
×
1273
                return apicresp, err
×
1274
        }
×
1275
        conn.sign(req, uri, nil)
1✔
1276
        resp, err := conn.client.Do(req)
1✔
1277
        if err != nil {
1✔
1278
                conn.log.Error("Could not get response for ", url, ": ", err)
×
1279
                return apicresp, err
×
1280
        }
×
1281
        defer complete(resp)
1✔
1282
        if resp.StatusCode < 200 || resp.StatusCode >= 300 {
1✔
1283
                conn.logErrorResp("Could not get subtree for "+url, resp)
×
1284
                return apicresp, err
×
1285
        }
×
1286
        err = json.NewDecoder(resp.Body).Decode(&apicresp)
1✔
1287
        if err != nil {
1✔
1288
                conn.log.Error("Could not parse APIC response: ", err)
×
1289
                return apicresp, err
×
1290
        }
×
1291
        return apicresp, nil
1✔
1292
}
1293

1294
func (conn *ApicConnection) doSubscribe(args []string,
1295
        kind, value, refresh_interval string, apicresp *ApicResponse) bool {
1✔
1296
        // properly encoding the URI query parameters breaks APIC
1✔
1297
        uri := fmt.Sprintf("/api/%s/%s.json?subscription=yes&%s%s",
1✔
1298
                kind, value, refresh_interval, strings.Join(args, "&"))
1✔
1299
        url := fmt.Sprintf("https://%s%s", conn.Apic[conn.ApicIndex], uri)
1✔
1300
        conn.log.Info("APIC connection URL: ", url)
1✔
1301

1✔
1302
        req, err := http.NewRequest("GET", url, http.NoBody)
1✔
1303
        if err != nil {
1✔
1304
                conn.log.Error("Could not create request: ", err)
×
1305
                return false
×
1306
        }
×
1307
        conn.sign(req, uri, nil)
1✔
1308
        resp, err := conn.client.Do(req)
1✔
1309
        if err != nil {
1✔
1310
                conn.log.Error("Failed to subscribe to ", value, ": ", err)
×
1311
                return false
×
1312
        }
×
1313
        defer complete(resp)
1✔
1314
        if resp.StatusCode < 200 || resp.StatusCode >= 300 {
1✔
1315
                conn.logErrorResp("Could not subscribe to "+value, resp)
×
1316
                return false
×
1317
        }
×
1318

1319
        err = json.NewDecoder(resp.Body).Decode(apicresp)
1✔
1320
        if err != nil {
1✔
1321
                conn.log.Error("Could not decode APIC response", err)
×
1322
                return false
×
1323
        }
×
1324
        time.Sleep(conn.SubscriptionDelay)
1✔
1325
        return true
1✔
1326
}
1327

1328
func (conn *ApicConnection) subscribe(value string, sub *subscription) bool {
1✔
1329
        baseArgs := []string{
1✔
1330
                "query-target=subtree",
1✔
1331
                "rsp-subtree=full",
1✔
1332
                "target-subtree-class=" + strings.Join(sub.targetClasses, ","),
1✔
1333
        }
1✔
1334

1✔
1335
        const defaultArgs = 1
1✔
1336
        var argCount = defaultArgs
1✔
1337
        var combinableSubClasses, separableSubClasses []string
1✔
1338
        var splitTargetClasses [][]string
1✔
1339
        var splitRespClasses [][]string
1✔
1340
        var argSet [][]string
1✔
1341
        argSet = make([][]string, defaultArgs)
1✔
1342
        argSet[defaultArgs-1] = make([]string, len(baseArgs))
1✔
1343
        copy(argSet[defaultArgs-1], baseArgs)
1✔
1344
        if sub.respClasses != nil {
2✔
1345
                separateClasses := func(classes []string, combClasses, sepClasses *[]string) {
2✔
1346
                        for i := range classes {
2✔
1347
                                if classMeta, ok := metadata[classes[i]]; ok {
2✔
1348
                                        if classes[i] == "tagAnnotation" {
2✔
1349
                                                continue
1✔
1350
                                        }
1351
                                        if classMeta.hints != nil && classMeta.hints["cardinality"] == "high" {
2✔
1352
                                                *sepClasses = append(*sepClasses, classes[i])
1✔
1353
                                                continue
1✔
1354
                                        }
1355
                                        *combClasses = append(*combClasses, classes[i])
1✔
1356
                                }
1357
                        }
1358
                }
1359
                separateClasses(sub.respClasses, &combinableSubClasses, &separableSubClasses)
1✔
1360

1✔
1361
                // In case there are high cardinality children, we register for all the classes individually.
1✔
1362
                // The concept of target-subtree and rsp-subtree class cannot be used because of the tagAnnotation object
1✔
1363
                // vmmInjectedLabel is added for every object, so getting it separately will not be scalable
1✔
1364
                if len(separableSubClasses) > 0 {
2✔
1365
                        separateClasses(sub.targetClasses, &combinableSubClasses, &separableSubClasses)
1✔
1366
                        separableSubClasses = append(separableSubClasses, combinableSubClasses...)
1✔
1367
                        baseArgs = []string{
1✔
1368
                                "query-target=subtree",
1✔
1369
                                "rsp-subtree=children",
1✔
1370
                        }
1✔
1371
                        subscribingClasses := make(map[string]bool)
1✔
1372
                        argSet = make([][]string, len(separableSubClasses))
1✔
1373
                        splitTargetClasses = make([][]string, len(separableSubClasses))
1✔
1374
                        splitRespClasses = make([][]string, len(separableSubClasses))
1✔
1375

1✔
1376
                        argCount = 0
1✔
1377
                        for i := range separableSubClasses {
2✔
1378
                                // Eliminate duplicates
1✔
1379
                                if _, ok := subscribingClasses[separableSubClasses[i]]; ok {
1✔
1380
                                        continue
×
1381
                                }
1382
                                subscribingClasses[separableSubClasses[i]] = true
1✔
1383
                                argSet[argCount] = make([]string, len(baseArgs))
1✔
1384
                                copy(argSet[argCount], baseArgs)
1✔
1385
                                argSet[argCount] = append(argSet[argCount], "target-subtree-class="+separableSubClasses[i], "rsp-subtree-class=tagAnnotation")
1✔
1386
                                splitTargetClasses[argCount] = append(splitTargetClasses[argCount], separableSubClasses[i])
1✔
1387
                                splitRespClasses[argCount] = computeRespClasses([]string{separableSubClasses[i]})
1✔
1388
                                argCount++
1✔
1389
                        }
1390
                } else {
1✔
1391
                        argSet[defaultArgs-1] = append(argSet[defaultArgs-1], "rsp-subtree-class="+strings.Join(combinableSubClasses, ",")+",tagAnnotation")
1✔
1392
                }
1✔
1393
        }
1394
        if sub.targetFilter != "" {
2✔
1395
                targetFilterArgs := "query-target-filter=" + sub.targetFilter
1✔
1396
                if len(separableSubClasses) == 0 {
2✔
1397
                        argSet[defaultArgs-1] = append(argSet[defaultArgs-1], targetFilterArgs)
1✔
1398
                } else {
1✔
1399
                        for i := 0; i < argCount; i++ {
×
1400
                                argSet[i] = append(argSet[i], targetFilterArgs)
×
1401
                        }
×
1402
                }
1403
        }
1404

1405
        kind := "mo"
1✔
1406
        if sub.kind == apicSubClass || sub.kind == apicSubTree {
2✔
1407
                kind = "class"
1✔
1408
        }
1✔
1409

1410
        refresh_interval := ""
1✔
1411
        if conn.RefreshInterval != 0 {
2✔
1412
                refresh_interval = fmt.Sprintf("refresh-timeout=%v&",
1✔
1413
                        conn.RefreshInterval.Seconds())
1✔
1414
        }
1✔
1415
        for i := 0; i < argCount; i++ {
2✔
1416
                var apicresp ApicResponse
1✔
1417
                if !conn.doSubscribe(argSet[i], kind, value, refresh_interval, &apicresp) {
1✔
1418
                        return false
×
1419
                }
×
1420
                subId, ok := apicresp.SubscriptionId.(string)
1✔
1421
                if !ok {
1✔
1422
                        conn.log.Error("Subscription ID is not a string")
×
1423
                        return false
×
1424
                }
×
1425

1426
                conn.logger.WithFields(logrus.Fields{
1✔
1427
                        "mod":   "APICAPI",
1✔
1428
                        "value": value,
1✔
1429
                        "kind":  kind,
1✔
1430
                        "id":    subId,
1✔
1431
                        "args":  argSet[i],
1✔
1432
                }).Debug("Subscribed")
1✔
1433

1✔
1434
                conn.indexMutex.Lock()
1✔
1435
                if argCount > defaultArgs {
2✔
1436
                        sub.childSubs[subId] = subComponent{
1✔
1437
                                targetClasses: splitTargetClasses[i],
1✔
1438
                                respClasses:   splitRespClasses[i],
1✔
1439
                        }
1✔
1440
                } else {
2✔
1441
                        conn.subscriptions.subs[value].id = subId
1✔
1442
                }
1✔
1443
                conn.subscriptions.ids[subId] = value
1✔
1444
                conn.indexMutex.Unlock()
1✔
1445
                var respObjCount int
1✔
1446
                for _, obj := range apicresp.Imdata {
2✔
1447
                        dn := obj.GetDn()
1✔
1448
                        if dn == "" {
1✔
1449
                                continue
×
1450
                        }
1451
                        conn.indexMutex.Lock()
1✔
1452
                        subIds, found := conn.cacheDnSubIds[dn]
1✔
1453
                        if !found {
2✔
1454
                                subIds = make(map[string]bool)
1✔
1455
                                conn.cacheDnSubIds[dn] = subIds
1✔
1456
                        }
1✔
1457
                        subIds[subId] = true
1✔
1458
                        conn.indexMutex.Unlock()
1✔
1459

1✔
1460
                        if sub.updateHook != nil && sub.updateHook(obj) {
2✔
1461
                                continue
1✔
1462
                        }
1463

1464
                        tag := obj.GetTag()
1✔
1465
                        if !conn.isSyncTag(tag) {
1✔
1466
                                continue
×
1467
                        }
1468

1469
                        conn.logger.WithFields(logrus.Fields{
1✔
1470
                                "mod": "APICAPI",
1✔
1471
                                "dn":  dn,
1✔
1472
                                "tag": tag,
1✔
1473
                                "obj": obj,
1✔
1474
                        }).Debug("Caching")
1✔
1475
                        var count int
1✔
1476
                        prepareApicCache("", obj, &count)
1✔
1477
                        respObjCount += count
1✔
1478
                        conn.indexMutex.Lock()
1✔
1479
                        conn.cachedState[tag] = append(conn.cachedState[tag], obj)
1✔
1480
                        conn.indexMutex.Unlock()
1✔
1481
                }
1482
                if respObjCount >= ApicSubscriptionResponseMoMaxCount/10 {
1✔
1483
                        conn.logger.WithFields(logrus.Fields{
×
1484
                                "args":       argSet[i],
×
1485
                                "moCount":    respObjCount,
×
1486
                                "maxAllowed": ApicSubscriptionResponseMoMaxCount,
×
1487
                        }).Warning("Subscription response is significantly large. Each new object will add 2 Mos atleast and twice the number of labels on the object")
×
1488
                } else {
1✔
1489
                        conn.logger.WithFields(logrus.Fields{
1✔
1490
                                "moCount": respObjCount,
1✔
1491
                        }).Debug("ResponseObjCount")
1✔
1492
                }
1✔
1493
        }
1494

1495
        return true
1✔
1496
}
1497

1498
var tagRegexp = regexp.MustCompile(`[a-zA-Z0-9_]{1,31}-[a-f0-9]{32}`)
1499

1500
func (conn *ApicConnection) isSyncTag(tag string) bool {
1✔
1501
        return tagRegexp.MatchString(tag) &&
1✔
1502
                strings.HasPrefix(tag, conn.prefix+"-")
1✔
1503
}
1✔
1504

1505
func getRootDn(dn, rootClass string) string {
1✔
1506
        depth := classDepth[rootClass]
1✔
1507
        parts := strings.Split(dn, "/")
1✔
1508
        parts = parts[:depth]
1✔
1509
        return strings.Join(parts, "/")
1✔
1510
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc