• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

noironetworks / aci-containers / 10023

06 Nov 2024 10:18AM UTC coverage: 69.589% (+0.04%) from 69.549%
10023

push

travis-pro

jeffinkottaram
debug logs

21 of 24 new or added lines in 1 file covered. (87.5%)

32 existing lines in 2 files now uncovered.

13199 of 18967 relevant lines covered (69.59%)

0.79 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

66.29
/pkg/apicapi/apicapi.go
1
// Copyright 2017 Cisco Systems, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14

15
// Interface for connecting to APIC REST API using websockets
16
package apicapi
17

18
import (
19
        "bytes"
20
        "crypto/tls"
21
        "crypto/x509"
22
        "encoding/json"
23
        "errors"
24
        "fmt"
25
        "io"
26
        "net/http"
27
        "net/http/cookiejar"
28
        "regexp"
29
        "sort"
30
        "strconv"
31
        "strings"
32
        "time"
33

34
        "k8s.io/apimachinery/pkg/util/wait"
35
        "k8s.io/client-go/util/workqueue"
36

37
        "github.com/gorilla/websocket"
38
        "github.com/sirupsen/logrus"
39
        "golang.org/x/time/rate"
40
)
41

42
// defaultConnectionRefresh is used as connection refresh interval if
43
// RefreshInterval is set to 0
44
const defaultConnectionRefresh = 30 * time.Second
45

46
// ApicVersion - This global variable to be used when dealing with version-
47
// dependencies during APIC interaction. It gets filled with actual version
48
// as part of runConn()
49
var (
50
        ApicVersion = "3.1"
51
)
52

53
func complete(resp *http.Response) {
1✔
54
        if resp.StatusCode != http.StatusOK {
2✔
55
                rBody, err := io.ReadAll(resp.Body)
1✔
56
                if err != nil {
1✔
57
                        logrus.Errorf("ReadAll :%v", err)
×
58
                } else {
1✔
59
                        logrus.Infof("Resp: %s", rBody)
1✔
60
                }
1✔
61
        }
62
        resp.Body.Close()
1✔
63
}
64

65
// Yes, this is really stupid, but this is really how this works
66
func (conn *ApicConnection) sign(req *http.Request, uri string, body []byte) {
1✔
67
        if conn.signer == nil {
2✔
68
                return
1✔
69
        }
1✔
70

71
        sig, err := conn.signer.sign(req.Method, uri, body)
1✔
72
        if err != nil {
1✔
73
                conn.log.Error("Failed to sign request: ", err)
×
74
                return
×
75
        }
×
76

77
        req.Header.Set("Cookie", conn.apicSigCookie(sig, conn.token))
1✔
78
}
79

80
func (conn *ApicConnection) apicSigCookie(sig, token string) string {
1✔
81
        tokc := ""
1✔
82
        if token != "" {
2✔
83
                tokc = "; APIC-WebSocket-Session=" + token
1✔
84
        }
1✔
85
        return fmt.Sprintf("APIC-Request-Signature=%s; "+
1✔
86
                "APIC-Certificate-Algorithm=v1.0; "+
1✔
87
                "APIC-Certificate-DN=uni/userext/user-%s/usercert-%s.crt; "+
1✔
88
                "APIC-Certificate-Fingerprint=fingerprint%s",
1✔
89
                sig, conn.user, conn.user, tokc)
1✔
90
}
91

92
func (conn *ApicConnection) login() (string, error) {
1✔
93
        var path string
1✔
94
        var method string
1✔
95

1✔
96
        if conn.signer == nil {
2✔
97
                path = "aaaLogin"
1✔
98
                method = "POST"
1✔
99
        } else {
2✔
100
                path = "webtokenSession"
1✔
101
                method = "GET"
1✔
102
        }
1✔
103
        uri := fmt.Sprintf("/api/%s.json", path)
1✔
104
        url := fmt.Sprintf("https://%s%s", conn.Apic[conn.ApicIndex], uri)
1✔
105

1✔
106
        var reqBody io.Reader
1✔
107
        var raw []byte
1✔
108
        var err error
1✔
109
        if conn.signer == nil {
2✔
110
                login := &ApicObject{
1✔
111
                        "aaaUser": &ApicObjectBody{
1✔
112
                                Attributes: map[string]interface{}{
1✔
113
                                        "name": conn.user,
1✔
114
                                        "pwd":  conn.password,
1✔
115
                                },
1✔
116
                        },
1✔
117
                }
1✔
118
                raw, err = json.Marshal(login)
1✔
119
                if err != nil {
1✔
120
                        return "", err
×
121
                }
×
122
                reqBody = bytes.NewBuffer(raw)
1✔
123
        }
124
        req, err := http.NewRequest(method, url, reqBody)
1✔
125
        if err != nil {
1✔
126
                return "", err
×
127
        }
×
128
        conn.log.Infof("Req: %+v", req)
1✔
129
        conn.sign(req, uri, raw)
1✔
130
        req.Header.Set("Content-Type", "application/json")
1✔
131
        resp, err := conn.client.Do(req)
1✔
132
        if err != nil {
2✔
133
                return "", err
1✔
134
        }
1✔
135
        defer complete(resp)
1✔
136

1✔
137
        if resp.StatusCode < 200 || resp.StatusCode >= 300 {
2✔
138
                conn.logErrorResp("Error while logging into APIC", resp)
1✔
139
                return "", errors.New("Server returned error status")
1✔
140
        }
1✔
141

142
        var apicresp ApicResponse
1✔
143
        err = json.NewDecoder(resp.Body).Decode(&apicresp)
1✔
144
        if err != nil {
1✔
145
                return "", err
×
146
        }
×
147

148
        for _, obj := range apicresp.Imdata {
2✔
149
                lresp, ok := obj["aaaLogin"]
1✔
150
                if !ok {
2✔
151
                        lresp, ok = obj["webtokenSession"]
1✔
152
                        if !ok {
1✔
153
                                continue
×
154
                        }
155
                }
156

157
                token, ok := lresp.Attributes["token"]
1✔
158
                if !ok {
1✔
159
                        return "", errors.New("Token not found in login response")
×
160
                }
×
161
                stoken, isStr := token.(string)
1✔
162
                if !isStr {
1✔
163
                        return "", errors.New("Token is not a string")
×
164
                }
×
165
                return stoken, nil
1✔
166
        }
167
        return "", errors.New("Login response not found")
×
168
}
169

170
func configureTls(cert []byte) (*tls.Config, error) {
1✔
171
        if cert == nil {
1✔
172
                return &tls.Config{InsecureSkipVerify: true}, nil
×
173
        }
×
174
        pool := x509.NewCertPool()
1✔
175
        if !pool.AppendCertsFromPEM(cert) {
1✔
176
                return nil, errors.New("Could not load CA certificates")
×
177
        }
×
178
        return &tls.Config{RootCAs: pool}, nil
1✔
179
}
180

181
func New(log *logrus.Logger, apic []string, user string,
182
        password string, privKey []byte, cert []byte,
183
        prefix string, refresh int, refreshTickerAdjust int,
184
        subscriptionDelay int, vrfTenant string, lldpIfHldr func(dn, lldpIf string) bool) (*ApicConnection, error) {
1✔
185
        tls, err := configureTls(cert)
1✔
186
        if err != nil {
1✔
187
                return nil, err
×
188
        }
×
189

190
        var signer *signer
1✔
191
        if privKey != nil {
2✔
192
                signer, err = newSigner(privKey)
1✔
193
                if err != nil {
1✔
194
                        return nil, err
×
195
                }
×
196
        }
197

198
        dialer := &websocket.Dialer{
1✔
199
                TLSClientConfig: tls,
1✔
200
        }
1✔
201
        tr := &http.Transport{
1✔
202
                Proxy:           http.ProxyFromEnvironment,
1✔
203
                TLSClientConfig: dialer.TLSClientConfig,
1✔
204
        }
1✔
205
        jar, err := cookiejar.New(nil)
1✔
206
        if err != nil {
1✔
207
                return nil, err
×
208
        }
×
209
        client := &http.Client{
1✔
210
                Transport: tr,
1✔
211
                Jar:       jar,
1✔
212
                Timeout:   5 * time.Minute,
1✔
213
        }
1✔
214

1✔
215
        conn := &ApicConnection{
1✔
216
                ReconnectInterval:   time.Duration(5) * time.Second,
1✔
217
                ReconnectRetryLimit: 5,
1✔
218
                RefreshInterval:     time.Duration(refresh) * time.Second,
1✔
219
                RefreshTickerAdjust: time.Duration(refreshTickerAdjust) * time.Second,
1✔
220
                SubscriptionDelay:   time.Duration(subscriptionDelay) * time.Millisecond,
1✔
221
                SyncDone:            false,
1✔
222
                signer:              signer,
1✔
223
                dialer:              dialer,
1✔
224
                logger:              log,
1✔
225
                log:                 log.WithField("mod", "APICAPI"),
1✔
226
                Apic:                apic,
1✔
227
                user:                user,
1✔
228
                password:            password,
1✔
229
                prefix:              prefix,
1✔
230
                client:              client,
1✔
231
                vrfTenant:           vrfTenant,
1✔
232
                lldpIfHldr:          lldpIfHldr,
1✔
233
                subscriptions: subIndex{
1✔
234
                        subs: make(map[string]*subscription),
1✔
235
                        ids:  make(map[string]string),
1✔
236
                },
1✔
237
                desiredState:       make(map[string]ApicSlice),
1✔
238
                desiredStateDn:     make(map[string]ApicObject),
1✔
239
                keyHashes:          make(map[string]string),
1✔
240
                containerDns:       make(map[string]bool),
1✔
241
                cachedState:        make(map[string]ApicSlice),
1✔
242
                cacheDnSubIds:      make(map[string]map[string]bool),
1✔
243
                pendingSubDnUpdate: make(map[string]pendingChange),
1✔
244
                CachedSubnetDns:    make(map[string]string),
1✔
245
                cachedLLDPIfs:      make(map[string]string),
1✔
246
        }
1✔
247
        return conn, nil
1✔
248
}
249

250
func (conn *ApicConnection) handleSocketUpdate(apicresp *ApicResponse) {
1✔
251
        var subIds []string
1✔
252
        switch ids := apicresp.SubscriptionId.(type) {
1✔
253
        case string:
×
254
                subIds = append(subIds, ids)
×
255
        case []interface{}:
1✔
256
                for _, id := range ids {
2✔
257
                        subIds = append(subIds, id.(string))
1✔
258
                }
1✔
259
        }
260

261
        nameAttrClass := map[string]bool{"vnsLDevVip": true, "vnsAbsGraph": true, "vzFilter": true, "vzBrCP": true, "l3extInstP": true, "vnsSvcRedirectPol": true, "vnsRedirectHealthGroup": true, "fvIPSLAMonitoringPol": true}
1✔
262

1✔
263
        for _, obj := range apicresp.Imdata {
2✔
264
                for key, body := range obj {
2✔
265
                        if dn, ok := body.Attributes["dn"].(string); ok {
2✔
266
                                if status, isStr := body.Attributes["status"].(string); isStr {
2✔
267
                                        dnSlice := strings.Split(dn, "/")
1✔
268
                                        if len(dnSlice) > 1 && strings.Contains(dnSlice[1], conn.vrfTenant) {
2✔
269
                                                var attr string
1✔
270
                                                if nameAttrClass[key] {
1✔
271
                                                        _, ok := body.Attributes["name"]
×
272
                                                        if ok {
×
273
                                                                attr = body.Attributes["name"].(string)
×
274
                                                        }
×
275
                                                } else if key == "tagAnnotation" {
1✔
276
                                                        _, ok := body.Attributes["value"]
×
277
                                                        if ok {
×
278
                                                                attr = body.Attributes["value"].(string)
×
279
                                                        }
×
280
                                                }
281
                                                if attr != "" && !strings.Contains(attr, conn.prefix) {
1✔
282
                                                        conn.log.Debug("Skipping websocket notification for :", dn)
×
283
                                                        continue
×
284
                                                }
285
                                        }
286
                                        var pendingKind int
1✔
287
                                        if status == "deleted" {
2✔
288
                                                pendingKind = pendingChangeDelete
1✔
289
                                        } else {
2✔
290
                                                pendingKind = pendingChangeUpdate
1✔
291
                                        }
1✔
292
                                        conn.indexMutex.Lock()
1✔
293

1✔
294
                                        conn.logger.WithFields(logrus.Fields{
1✔
295
                                                "mod": "APICAPI",
1✔
296
                                                "dn":  obj.GetDn(),
1✔
297
                                                "obj": obj,
1✔
298
                                        }).Debug("Processing websocket notification for:")
1✔
299

1✔
300
                                        conn.pendingSubDnUpdate[dn] = pendingChange{
1✔
301
                                                kind:    pendingKind,
1✔
302
                                                subIds:  subIds,
1✔
303
                                                isDirty: false,
1✔
304
                                        }
1✔
305
                                        if key == "opflexODev" && conn.odevQueue != nil {
2✔
306
                                                conn.log.Debug("Adding dn to odevQueue: ", dn)
1✔
307
                                                conn.odevQueue.Add(dn)
1✔
308
                                        } else if isPriorityObject(dn) {
2✔
309
                                                conn.log.Debug("Adding dn to priorityQueue: ", dn)
×
310
                                                conn.priorityQueue.Add(dn)
×
311
                                        } else if isLLDPIfObject(dn) && conn.lldpIfQueue != nil {
1✔
312
                                                if lldpIf, ok := body.Attributes["portDesc"].(string); ok {
×
313
                                                        conn.cachedLLDPIfs[dn] = lldpIf
×
314
                                                }
×
315
                                                conn.log.Debug("Adding dn to lldpIfQueue: ", dn)
×
316
                                                conn.lldpIfQueue.Add(dn)
×
317
                                        } else if conn.deltaQueue != nil {
2✔
318
                                                conn.deltaQueue.Add(dn)
1✔
319
                                        }
1✔
320
                                        conn.indexMutex.Unlock()
1✔
321
                                }
322
                        }
323
                }
324
        }
325
}
326

327
func (conn *ApicConnection) restart() {
1✔
328
        conn.indexMutex.Lock()
1✔
329
        if conn.restartCh != nil {
2✔
330
                conn.log.Debug("Restarting connection")
1✔
331
                close(conn.restartCh)
1✔
332
                conn.restartCh = nil
1✔
333
        }
1✔
334
        conn.indexMutex.Unlock()
1✔
335
}
336

337
func (conn *ApicConnection) handleQueuedDn(dn string) bool {
1✔
338
        var respClasses []string
1✔
339
        var updateHandlers []ApicObjectHandler
1✔
340
        var deleteHandlers []ApicDnHandler
1✔
341
        var rootDn string
1✔
342

1✔
343
        handleId := func(id string) {
2✔
344
                conn.indexMutex.Lock()
1✔
345
                if value, ok := conn.subscriptions.ids[id]; ok {
2✔
346
                        if sub, ok := conn.subscriptions.subs[value]; ok {
2✔
347
                                if subComp, ok := sub.childSubs[id]; ok {
1✔
348
                                        respClasses =
×
349
                                                append(respClasses, subComp.respClasses...)
×
350
                                } else {
1✔
351
                                        respClasses =
1✔
352
                                                append(respClasses, sub.respClasses...)
1✔
353
                                }
1✔
354
                                if sub.updateHook != nil {
2✔
355
                                        updateHandlers = append(updateHandlers, sub.updateHook)
1✔
356
                                }
1✔
357
                                if sub.deleteHook != nil {
2✔
358
                                        deleteHandlers = append(deleteHandlers, sub.deleteHook)
1✔
359
                                }
1✔
360

361
                                if sub.kind == apicSubTree {
1✔
362
                                        rootDn = getRootDn(dn, value)
×
363
                                }
×
364
                        }
365
                } else {
×
366
                        conn.log.Warning("Unexpected subscription: ", id)
×
367
                }
×
368
                conn.indexMutex.Unlock()
1✔
369
        }
370

371
        var requeue bool
1✔
372
        conn.indexMutex.Lock()
1✔
373
        pending, hasPendingChange := conn.pendingSubDnUpdate[dn]
1✔
374
        conn.pendingSubDnUpdate[dn] = pendingChange{isDirty: true}
1✔
375
        obj, hasDesiredState := conn.desiredStateDn[dn]
1✔
376
        conn.indexMutex.Unlock()
1✔
377

1✔
378
        if hasPendingChange {
2✔
379
                for _, id := range pending.subIds {
2✔
380
                        handleId(id)
1✔
381
                }
1✔
382
        }
383

384
        if rootDn == "" {
2✔
385
                rootDn = dn
1✔
386
        }
1✔
387

388
        if hasDesiredState {
2✔
389
                if hasPendingChange {
2✔
390
                        if pending.kind == pendingChangeDelete {
2✔
391
                                conn.logger.WithFields(logrus.Fields{"mod": "APICAPI", "DN": dn}).
1✔
392
                                        Warning("Restoring unexpectedly deleted" +
1✔
393
                                                " ACI object")
1✔
394
                                requeue = conn.postDn(dn, obj)
1✔
395
                        } else {
2✔
396
                                conn.log.Debug("getSubtreeDn for:", rootDn)
1✔
397
                                conn.getSubtreeDn(rootDn, respClasses, updateHandlers)
1✔
398
                        }
1✔
399
                } else {
1✔
400
                        requeue = conn.postDn(dn, obj)
1✔
401
                }
1✔
402
        } else {
1✔
403
                if hasPendingChange {
2✔
404
                        if pending.kind == pendingChangeDelete {
2✔
405
                                for _, handler := range deleteHandlers {
2✔
406
                                        handler(dn)
1✔
407
                                }
1✔
408
                        }
409

410
                        if (pending.kind != pendingChangeDelete) || (dn != rootDn) {
2✔
411
                                conn.log.Debug("getSubtreeDn for:", rootDn)
1✔
412
                                conn.getSubtreeDn(rootDn, respClasses, updateHandlers)
1✔
413
                        }
1✔
414
                } else {
1✔
415
                        requeue = conn.Delete(dn)
1✔
416
                }
1✔
417
        }
418

419
        return requeue
1✔
420
}
421

422
func (conn *ApicConnection) processQueue(queue workqueue.RateLimitingInterface,
423
        queueStop <-chan struct{}, name string) {
1✔
424
        go wait.Until(func() {
2✔
425
                conn.log.Debug("Running processQueue for queue ", name)
1✔
426
                for {
2✔
427
                        dn, quit := queue.Get()
1✔
428
                        if quit {
2✔
429
                                break
1✔
430
                        }
431
                        conn.log.Debug("Processing queue for:", dn)
1✔
432
                        var requeue bool
1✔
433
                        if dn, ok := dn.(string); ok {
2✔
434
                                requeue = conn.handleQueuedDn(dn)
1✔
435
                        }
1✔
436
                        if requeue {
1✔
437
                                queue.AddRateLimited(dn)
×
438
                        } else {
1✔
439
                                conn.indexMutex.Lock()
1✔
440
                                if conn.pendingSubDnUpdate[dn.(string)].isDirty {
2✔
441
                                        delete(conn.pendingSubDnUpdate, dn.(string))
1✔
442
                                }
1✔
443
                                conn.indexMutex.Unlock()
1✔
444
                                queue.Forget(dn)
1✔
445
                        }
446
                        queue.Done(dn)
1✔
447
                }
448
        }, time.Second, queueStop)
449
        <-queueStop
1✔
450
        queue.ShutDown()
1✔
451
}
452

453
func (conn *ApicConnection) processLLDPIfQueue(queue workqueue.RateLimitingInterface,
454
        handler func(dn, lldpIf string) bool, stopCh <-chan struct{}) {
1✔
455
        go wait.Until(func() {
2✔
456
                for {
2✔
457
                        key, quit := queue.Get()
1✔
458
                        if quit {
2✔
459
                                break
1✔
460
                        }
461
                        var requeue bool
×
462
                        switch key := key.(type) {
×
463
                        case chan struct{}:
×
464
                                close(key)
×
465
                        case string:
×
466
                                if handler != nil {
×
467
                                        if lldpIf, ok := conn.cachedLLDPIfs[key]; ok {
×
468
                                                requeue = handler(key, lldpIf)
×
469
                                        }
×
470
                                }
471
                        }
472
                        if requeue {
×
473
                                queue.AddRateLimited(key)
×
474
                        } else {
×
475
                                queue.Forget(key)
×
476
                        }
×
477
                        queue.Done(key)
×
478

479
                }
480
        }, time.Second, stopCh)
481
        <-stopCh
1✔
482
        queue.ShutDown()
1✔
483
}
484

485
type fullSync struct{}
486

487
func (conn *ApicConnection) UnsubscribeImmediateDnLocked(dn string,
488
        targetClasses []string) {
×
489
        // Post local delete, subscription will not be refreshed
×
490
        // after refresh time-out
×
491
        delete(conn.subscriptions.subs, dn)
×
492
}
×
493

494
func (conn *ApicConnection) AddImmediateSubscriptionDnLocked(dn string,
495
        targetClasses []string, updateHook ApicObjectHandler, deleteHook ApicDnHandler) bool {
×
496
        conn.logger.WithFields(logrus.Fields{
×
497
                "mod": "APICAPI",
×
498
                "dn":  dn,
×
499
        }).Debug("Adding Subscription for Dn")
×
500

×
501
        conn.subscriptions.subs[dn] = &subscription{
×
502
                kind:          apicSubDn,
×
503
                childSubs:     make(map[string]subComponent),
×
504
                targetClasses: targetClasses,
×
505
                respClasses:   computeRespClasses(targetClasses),
×
506
        }
×
507
        if updateHook != nil {
×
508
                conn.subscriptions.subs[dn].updateHook = updateHook
×
509
        }
×
510
        if deleteHook != nil {
×
511
                conn.subscriptions.subs[dn].deleteHook = deleteHook
×
512
        }
×
513
        return conn.subscribe(dn, conn.subscriptions.subs[dn], true)
×
514
}
515

516
func (conn *ApicConnection) runConn(stopCh <-chan struct{}) {
1✔
517
        done := make(chan struct{})
1✔
518
        restart := make(chan struct{})
1✔
519
        queueStop := make(chan struct{})
1✔
520
        odevQueueStop := make(chan struct{})
1✔
521
        priorityQueueStop := make(chan struct{})
1✔
522
        syncHook := make(chan fullSync, 1)
1✔
523
        conn.restartCh = restart
1✔
524

1✔
525
        go func() {
2✔
526
                defer conn.connection.Close()
1✔
527
                defer close(done)
1✔
528

1✔
529
                for {
2✔
530
                        var apicresp ApicResponse
1✔
531
                        err := conn.connection.ReadJSON(&apicresp)
1✔
532
                        var closeErr *websocket.CloseError
1✔
533
                        if errors.As(err, &closeErr) {
1✔
534
                                conn.log.Info("Websocket connection closed: ", closeErr.Code)
×
535
                                conn.restart()
×
536
                                break
×
537
                        } else if err != nil {
2✔
538
                                conn.log.Error("Could not read web socket message:", err)
1✔
539
                                conn.restart()
1✔
540
                                break
1✔
541
                        } else {
1✔
542
                                conn.handleSocketUpdate(&apicresp)
1✔
543
                        }
1✔
544
                }
545
        }()
546

547
        conn.indexMutex.Lock()
1✔
548
        oldState := conn.cacheDnSubIds
1✔
549
        conn.cachedState = make(map[string]ApicSlice)
1✔
550
        conn.cacheDnSubIds = make(map[string]map[string]bool)
1✔
551
        conn.deltaQueue = workqueue.NewNamedRateLimitingQueue(
1✔
552
                workqueue.NewMaxOfRateLimiter(
1✔
553
                        workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond,
1✔
554
                                10*time.Second),
1✔
555
                        &workqueue.BucketRateLimiter{
1✔
556
                                Limiter: rate.NewLimiter(rate.Limit(10), int(100)),
1✔
557
                        },
1✔
558
                ),
1✔
559
                "delta")
1✔
560
        go conn.processQueue(conn.deltaQueue, queueStop, "delta")
1✔
561
        conn.odevQueue = workqueue.NewNamedRateLimitingQueue(
1✔
562
                workqueue.NewMaxOfRateLimiter(
1✔
563
                        workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond,
1✔
564
                                10*time.Second),
1✔
565
                        &workqueue.BucketRateLimiter{
1✔
566
                                Limiter: rate.NewLimiter(rate.Limit(10), int(100)),
1✔
567
                        },
1✔
568
                ),
1✔
569
                "odev")
1✔
570
        go conn.processQueue(conn.odevQueue, odevQueueStop, "odev")
1✔
571
        conn.priorityQueue = workqueue.NewNamedRateLimitingQueue(
1✔
572
                workqueue.NewMaxOfRateLimiter(
1✔
573
                        workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond,
1✔
574
                                10*time.Second),
1✔
575
                        &workqueue.BucketRateLimiter{
1✔
576
                                Limiter: rate.NewLimiter(rate.Limit(10), int(100)),
1✔
577
                        },
1✔
578
                ),
1✔
579
                "priority")
1✔
580
        go conn.processQueue(conn.priorityQueue, priorityQueueStop, "priority")
1✔
581
        conn.lldpIfQueue = workqueue.NewNamedRateLimitingQueue(
1✔
582
                workqueue.NewMaxOfRateLimiter(
1✔
583
                        workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond,
1✔
584
                                10*time.Second),
1✔
585
                        &workqueue.BucketRateLimiter{
1✔
586
                                Limiter: rate.NewLimiter(rate.Limit(10), int(100)),
1✔
587
                        },
1✔
588
                ),
1✔
589
                "lldpIf")
1✔
590
        go conn.processLLDPIfQueue(conn.lldpIfQueue,
1✔
591
                conn.lldpIfHldr, stopCh)
1✔
592
        conn.indexMutex.Unlock()
1✔
593

1✔
594
        refreshInterval := conn.RefreshInterval
1✔
595
        if refreshInterval == 0 {
1✔
596
                refreshInterval = defaultConnectionRefresh
×
597
        }
×
598
        // Adjust refreshTickerInterval.
599
        // To refresh the subscriptions early than actual refresh timeout value
600
        refreshTickerInterval := refreshInterval - conn.RefreshTickerAdjust
1✔
601
        refreshTicker := time.NewTicker(refreshTickerInterval)
1✔
602
        defer refreshTicker.Stop()
1✔
603

1✔
604
        var hasErr bool
1✔
605
        for value, subscription := range conn.subscriptions.subs {
2✔
606
                if !(conn.subscribe(value, subscription, false)) {
1✔
607
                        hasErr = true
×
608
                        conn.restart()
×
609
                        break
×
610
                }
611
        }
612
        if !hasErr {
2✔
613
                conn.checkDeletes(oldState)
1✔
614
                go func() {
2✔
615
                        if conn.FullSyncHook != nil {
1✔
616
                                conn.FullSyncHook()
×
617
                        }
×
618
                        syncHook <- fullSync{}
1✔
619
                }()
620
        }
621

622
        // Get APIC version if connection restarts
623
        if conn.version == "" && conn.checkVersion {
1✔
624
                go func() {
×
625
                        version, err := conn.GetVersion()
×
626
                        if err != nil {
×
627
                                conn.log.Error("Error while getting APIC version: ", err, " Restarting connection...")
×
628
                                conn.restart()
×
629
                        } else {
×
630
                                conn.log.Debug("Cached version:", conn.CachedVersion, " New version:", version)
×
631
                                if ApicVersion != version {
×
632
                                        ApicVersion = version
×
633
                                        if ApicVersion >= "6.0(4c)" {
×
634
                                                metadata["fvBD"].attributes["serviceBdRoutingDisable"] = "no"
×
635
                                        } else {
×
636
                                                delete(metadata["fvBD"].attributes, "serviceBdRoutingDisable")
×
637
                                        }
×
638
                                        conn.VersionUpdateHook()
×
639
                                }
640
                                conn.CachedVersion = version
×
641
                        }
642
                }()
643
        }
644

645
        closeConn := func(stop bool) {
2✔
646
                close(queueStop)
1✔
647
                close(odevQueueStop)
1✔
648

1✔
649
                conn.indexMutex.Lock()
1✔
650
                conn.deltaQueue = nil
1✔
651
                conn.odevQueue = nil
1✔
652
                conn.priorityQueue = nil
1✔
653
                conn.lldpIfQueue = nil
1✔
654
                conn.stopped = stop
1✔
655
                conn.syncEnabled = false
1✔
656
                conn.subscriptions.ids = make(map[string]string)
1✔
657
                conn.version = ""
1✔
658
                conn.indexMutex.Unlock()
1✔
659

1✔
660
                conn.log.Debug("Shutting down web socket")
1✔
661
                err := conn.connection.WriteMessage(websocket.CloseMessage,
1✔
662
                        websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
1✔
663
                if err != nil {
1✔
664
                        conn.log.Error("Error while closing socket: ", err)
×
665
                } else {
1✔
666
                        select {
1✔
667
                        case <-done:
1✔
668
                        case <-time.After(time.Second):
×
669
                        }
670
                }
671
                conn.connection.Close()
1✔
672
        }
673

674
loop:
1✔
675
        for {
2✔
676
                select {
1✔
677
                case <-syncHook:
1✔
678
                        conn.fullSync()
1✔
679
                case <-refreshTicker.C:
1✔
680
                        conn.refresh()
1✔
681
                case <-restart:
1✔
682
                        closeConn(false)
1✔
683
                        break loop
1✔
684
                case <-stopCh:
1✔
685
                        closeConn(true)
1✔
686
                        break loop
1✔
687
                }
688
        }
689

690
        conn.log.Debug("Exiting websocket handler")
1✔
691
}
692

693
// This function should only to be called before we make the first connection to APIC.
694
// Use the cached Apic version when determining the version elsewhere. This can lead to inconsistent tokens.
695
func (conn *ApicConnection) GetVersion() (string, error) {
1✔
696
        versionMo := "firmwareCtrlrRunning"
1✔
697

1✔
698
        if len(conn.Apic) == 0 {
1✔
699
                return "", errors.New("No APIC configuration")
×
700
        }
×
701

702
        conn.checkVersion = true // enable version check on websocket reconnect
1✔
703
        // To Handle unit-tests
1✔
704
        if strings.Contains(conn.Apic[conn.ApicIndex], "127.0.0.1") {
2✔
705
                conn.version = "4.2(4i)"
1✔
706
                conn.SnatPbrFltrChain = true
1✔
707
                conn.log.Debug("Returning APIC version 4.2(4i) for test server")
1✔
708
                return conn.version, nil
1✔
709
        }
1✔
710

711
        uri := fmt.Sprintf("/api/node/class/%s.json?&", versionMo)
×
712
        url := fmt.Sprintf("https://%s%s", conn.Apic[conn.ApicIndex], uri)
×
713

×
714
        retries := 0
×
715
        for conn.version == "" {
×
716
                if retries <= conn.ReconnectRetryLimit {
×
717
                        // Wait before Retry.
×
718
                        time.Sleep(conn.ReconnectInterval)
×
719
                        retries++
×
720
                } else {
×
721
                        return "", fmt.Errorf("Failed to get APIC version after %d retries", retries)
×
722
                }
×
723

724
                token, err := conn.login()
×
725
                if err != nil {
×
726
                        conn.log.Error("Failed to log into APIC: ", err)
×
727
                        continue
×
728
                }
729
                conn.token = token
×
730

×
731
                req, err := http.NewRequest("GET", url, http.NoBody)
×
732
                if err != nil {
×
733
                        conn.log.Error("Could not create request:", err)
×
734
                        continue
×
735
                }
736
                conn.sign(req, uri, nil)
×
737
                resp, err := conn.client.Do(req)
×
738
                if err != nil {
×
739
                        conn.log.Error("Could not get response for ", versionMo, ": ", err)
×
740
                        continue
×
741
                }
742
                defer complete(resp)
×
743
                if resp.StatusCode < 200 || resp.StatusCode >= 300 {
×
744
                        conn.logErrorResp("Could not get response for "+versionMo, resp)
×
745
                        conn.log.Debug("Request:", req)
×
746
                        continue
×
747
                }
748

749
                var apicresp ApicResponse
×
750
                err = json.NewDecoder(resp.Body).Decode(&apicresp)
×
751
                if err != nil {
×
752
                        conn.log.Error("Could not parse APIC response: ", err)
×
753
                        continue
×
754
                }
755
                for _, obj := range apicresp.Imdata {
×
756
                        vresp := obj["firmwareCtrlrRunning"]
×
757
                        version, ok := vresp.Attributes["version"]
×
758
                        if !ok {
×
759
                                conn.log.Debug("No version attribute in the response??!")
×
760
                                conn.logger.WithFields(logrus.Fields{
×
761
                                        "mod":                            "APICAPI",
×
762
                                        "firmwareCtrlrRunning":           vresp,
×
763
                                        "firmwareCtrlRunning Attributes": vresp.Attributes,
×
764
                                }).Debug("Response:")
×
765
                        } else {
×
766
                                switch version := version.(type) {
×
767
                                default:
×
768
                                case string:
×
769
                                        version_split := strings.Split(version, "(")
×
770
                                        version_number, err := strconv.ParseFloat(version_split[0], 64)
×
771
                                        conn.log.Info("Actual APIC version:", version, " Stripped out version:", version_number)
×
772
                                        if err == nil {
×
773
                                                conn.version = version //return the actual version
×
774
                                        }
×
775
                                }
776
                        }
777
                }
778
        }
779
        return conn.version, nil
×
780
}
781

782
func (conn *ApicConnection) Run(stopCh <-chan struct{}) {
1✔
783
        if len(conn.Apic) == 0 {
1✔
784
                conn.log.Warning("APIC connection not configured")
×
785
                return
×
786
        }
×
787

788
        if conn.version >= "6.0(4c)" {
1✔
789
                metadata["fvBD"].attributes["serviceBdRoutingDisable"] = "no"
×
790
        }
×
791

792
        for !conn.stopped {
2✔
793
                func() {
2✔
794
                        defer func() {
2✔
795
                                conn.ApicIndex = (conn.ApicIndex + 1) % len(conn.Apic)
1✔
796
                                time.Sleep(conn.ReconnectInterval)
1✔
797
                        }()
1✔
798

799
                        conn.logger.WithFields(logrus.Fields{
1✔
800
                                "mod":  "APICAPI",
1✔
801
                                "host": conn.Apic[conn.ApicIndex],
1✔
802
                        }).Info("Connecting to APIC")
1✔
803

1✔
804
                        for dn := range conn.subscriptions.subs {
2✔
805
                                conn.subscriptions.subs[dn].childSubs = make(map[string]subComponent)
1✔
806
                        }
1✔
807
                        conn.subscriptions.ids = make(map[string]string)
1✔
808

1✔
809
                        token, err := conn.login()
1✔
810
                        if err != nil {
2✔
811
                                conn.log.Error("Failed to log into APIC: ", err)
1✔
812
                                return
1✔
813
                        }
1✔
814
                        conn.token = token
1✔
815

1✔
816
                        uri := fmt.Sprintf("/socket%s", token)
1✔
817
                        url := fmt.Sprintf("wss://%s%s",
1✔
818
                                conn.Apic[conn.ApicIndex], uri)
1✔
819
                        header := make(http.Header)
1✔
820
                        if conn.signer != nil {
2✔
821
                                sig, err := conn.signer.sign("GET", uri, nil)
1✔
822
                                if err != nil {
1✔
823
                                        conn.log.Error("Failed to sign request: ", err)
×
824
                                        return
×
825
                                }
×
826
                                header.Set("Cookie", conn.apicSigCookie(sig, token))
1✔
827
                        }
828

829
                        conn.connection, _, err = conn.dialer.Dial(url, header)
1✔
830
                        if err != nil {
1✔
831
                                conn.log.Error("Failed to open APIC websocket: ", err)
×
832
                                return
×
833
                        }
×
834
                        conn.log.Info("Websocket connected!")
1✔
835
                        conn.runConn(stopCh)
1✔
836
                }()
837
        }
838
}
839

840
func (conn *ApicConnection) refresh() {
1✔
841
        if conn.signer == nil {
2✔
842
                url := fmt.Sprintf("https://%s/api/aaaRefresh.json",
1✔
843
                        conn.Apic[conn.ApicIndex])
1✔
844
                req, err := http.NewRequest("GET", url, http.NoBody)
1✔
845
                if err != nil {
1✔
846
                        conn.log.Error("Could not create request: ", err)
×
847
                        return
×
848
                }
×
849
                resp, err := conn.client.Do(req)
1✔
850
                if err != nil {
2✔
851
                        conn.log.Error("Failed to refresh APIC session: ", err)
1✔
852
                        conn.restart()
1✔
853
                        return
1✔
854
                }
1✔
855
                if resp.StatusCode < 200 || resp.StatusCode >= 300 {
2✔
856
                        conn.logErrorResp("Error while refreshing login", resp)
1✔
857
                        complete(resp)
1✔
858
                        conn.restart()
1✔
859
                        return
1✔
860
                }
1✔
861
                complete(resp)
1✔
862
                conn.log.Debugf("Refresh: url %v", url)
1✔
863
        }
864

865
        for _, sub := range conn.subscriptions.subs {
2✔
866
                refreshId := func(id string) {
2✔
867
                        uri := fmt.Sprintf("/api/subscriptionRefresh.json?id=%s", id)
1✔
868
                        url := fmt.Sprintf("https://%s%s", conn.Apic[conn.ApicIndex], uri)
1✔
869
                        req, err := http.NewRequest("GET", url, http.NoBody)
1✔
870
                        if err != nil {
1✔
871
                                conn.log.Error("Could not create request: ", err)
×
872
                                return
×
873
                        }
×
874
                        conn.sign(req, uri, nil)
1✔
875
                        resp, err := conn.client.Do(req)
1✔
876
                        if err != nil {
1✔
UNCOV
877
                                conn.log.Error("Failed to refresh APIC subscription: ", err)
×
UNCOV
878
                                conn.restart()
×
UNCOV
879
                                return
×
UNCOV
880
                        }
×
881
                        if resp.StatusCode < 200 || resp.StatusCode >= 300 {
2✔
882
                                conn.logErrorResp("Error while refreshing subscription", resp)
1✔
883
                                complete(resp)
1✔
884
                                conn.restart()
1✔
885
                                return
1✔
886
                        }
1✔
887
                        complete(resp)
×
888
                        conn.log.Debugf("Refresh sub: url %v", url)
×
889
                        time.Sleep(conn.SubscriptionDelay)
×
890
                }
891
                if len(sub.childSubs) > 0 {
1✔
892
                        for id := range sub.childSubs {
×
893
                                refreshId(id)
×
894
                        }
×
895
                } else {
1✔
896
                        refreshId(sub.id)
1✔
897
                }
1✔
898
        }
899
}
900

901
func (conn *ApicConnection) logErrorResp(message string, resp *http.Response) {
1✔
902
        var apicresp ApicResponse
1✔
903
        err := json.NewDecoder(resp.Body).Decode(&apicresp)
1✔
904
        if err != nil {
1✔
905
                conn.log.Error("Could not parse APIC error response: ", err)
×
906
        } else {
1✔
907
                code := 0
1✔
908
                text := ""
1✔
909
                for _, o := range apicresp.Imdata {
2✔
910
                        if ob, ok := o["error"]; ok {
2✔
911
                                if ob.Attributes != nil {
2✔
912
                                        if t, isStr := ob.Attributes["text"].(string); isStr {
2✔
913
                                                text = t
1✔
914
                                        }
1✔
915
                                        if c, isInt := ob.Attributes["code"].(int); isInt {
1✔
916
                                                code = c
×
917
                                        }
×
918
                                }
919
                        }
920
                }
921
                conn.logger.WithFields(logrus.Fields{
1✔
922
                        "mod":    "APICAPI",
1✔
923
                        "text":   text,
1✔
924
                        "code":   code,
1✔
925
                        "url":    resp.Request.URL,
1✔
926
                        "status": resp.StatusCode,
1✔
927
                }).Error(message)
1✔
928
        }
929
}
930

931
// To make sure cluster's POD/NodeBDs and L3OUT are all mapped
932
// to same and correct VRF.
933
func (conn *ApicConnection) ValidateAciVrfAssociation(acivrfdn string, expectedVrfRelations []string) error {
×
934
        var aciVrfBdL3OuttDns []string
×
935
        args := []string{
×
936
                "query-target=subtree",
×
937
                "target-subtree-class=fvRtCtx,fvRtEctx",
×
938
        }
×
939

×
940
        uri := fmt.Sprintf("/api/mo/%s.json?%s", acivrfdn, strings.Join(args, "&"))
×
941
        url := fmt.Sprintf("https://%s%s", conn.Apic[conn.ApicIndex], uri)
×
942
        req, err := http.NewRequest("GET", url, http.NoBody)
×
943
        if err != nil {
×
944
                conn.log.Error("Could not create request: ", err)
×
945
                return err
×
946
        }
×
947
        conn.sign(req, uri, nil)
×
948
        resp, err := conn.client.Do(req)
×
949
        if err != nil {
×
950
                conn.log.Error("Could not get subtree for ", acivrfdn, ": ", err)
×
951
                return err
×
952
        }
×
953
        defer complete(resp)
×
954
        if resp.StatusCode < 200 || resp.StatusCode >= 300 {
×
955
                conn.logErrorResp("Could not get subtree for "+acivrfdn, resp)
×
956
                return err
×
957
        }
×
958

959
        var apicresp ApicResponse
×
960
        err = json.NewDecoder(resp.Body).Decode(&apicresp)
×
961
        if err != nil {
×
962
                conn.log.Error("Could not parse APIC response: ", err)
×
963
                return err
×
964
        }
×
965

966
        for _, obj := range apicresp.Imdata {
×
967
                for _, body := range obj {
×
968
                        tDn, ok := body.Attributes["tDn"].(string)
×
969
                        if !ok {
×
970
                                continue
×
971
                        }
972
                        aciVrfBdL3OuttDns = append(aciVrfBdL3OuttDns, tDn)
×
973
                }
974
        }
975
        sort.Strings(aciVrfBdL3OuttDns)
×
976
        conn.log.Debug("aciVrfBdL3OuttDns:", aciVrfBdL3OuttDns)
×
977
        for _, expectedDn := range expectedVrfRelations {
×
978
                i := sort.SearchStrings(aciVrfBdL3OuttDns, expectedDn)
×
979
                if !(i < len(aciVrfBdL3OuttDns) && aciVrfBdL3OuttDns[i] == expectedDn) {
×
980
                        conn.log.Debug("Missing (or) Incorrect Vrf association: ", expectedDn)
×
981
                        return errors.New("Incorrect Pod/NodeBD/L3OUT VRF association")
×
982
                }
×
983
        }
984
        return nil
×
985
}
986

987
func (conn *ApicConnection) getSubtreeDn(dn string, respClasses []string,
988
        updateHandlers []ApicObjectHandler) {
1✔
989
        args := []string{
1✔
990
                "rsp-subtree=full",
1✔
991
        }
1✔
992

1✔
993
        if len(respClasses) > 0 {
2✔
994
                args = append(args, "rsp-subtree-class="+strings.Join(respClasses, ","))
1✔
995
        }
1✔
996
        // properly encoding the URI query parameters breaks APIC
997
        uri := fmt.Sprintf("/api/mo/%s.json?%s", dn, strings.Join(args, "&"))
1✔
998
        url := fmt.Sprintf("https://%s%s", conn.Apic[conn.ApicIndex], uri)
1✔
999
        conn.log.Debugf("URL: %v", url)
1✔
1000
        req, err := http.NewRequest("GET", url, http.NoBody)
1✔
1001
        if err != nil {
1✔
1002
                conn.log.Error("Could not create request: ", err)
×
1003
                return
×
1004
        }
×
1005
        conn.sign(req, uri, nil)
1✔
1006
        resp, err := conn.client.Do(req)
1✔
1007
        if err != nil {
1✔
1008
                conn.log.Error("Could not get subtree for ", dn, ": ", err)
×
1009
                conn.restart()
×
1010
                return
×
1011
        }
×
1012
        defer complete(resp)
1✔
1013
        if resp.StatusCode < 200 || resp.StatusCode >= 300 {
1✔
1014
                conn.logErrorResp("Could not get subtree for "+dn, resp)
×
1015
                conn.restart()
×
1016
                return
×
1017
        }
×
1018

1019
        var apicresp ApicResponse
1✔
1020
        err = json.NewDecoder(resp.Body).Decode(&apicresp)
1✔
1021
        if err != nil {
1✔
1022
                conn.log.Error("Could not parse APIC response: ", err)
×
1023
                return
×
1024
        }
×
1025
        if len(apicresp.Imdata) == 0 {
1✔
1026
                conn.log.Debugf("No subtree found for dn %s", dn)
×
1027
        }
×
1028

1029
        for _, obj := range apicresp.Imdata {
2✔
1030
                conn.logger.WithFields(logrus.Fields{
1✔
1031
                        "mod": "APICAPI",
1✔
1032
                        "dn":  obj.GetDn(),
1✔
1033
                        "obj": obj,
1✔
1034
                }).Debug("Object updated on APIC")
1✔
1035
                var count int
1✔
1036
                prepareApicCache("", obj, &count)
1✔
1037

1✔
1038
                handled := false
1✔
1039
                for _, handler := range updateHandlers {
1✔
1040
                        if handler(obj) {
×
1041
                                handled = true
×
1042
                                break
×
1043
                        }
1044
                }
1045
                if handled {
1✔
1046
                        continue
×
1047
                }
1048
                conn.reconcileApicObject(obj)
1✔
1049
        }
1050
}
1051

1052
func (conn *ApicConnection) queuePriorityDn(dn string) {
×
1053
        conn.indexMutex.Lock()
×
1054
        if conn.priorityQueue != nil {
×
1055
                conn.priorityQueue.Add(dn)
×
1056
        }
×
1057
        conn.indexMutex.Unlock()
×
1058
}
1059

1060
func (conn *ApicConnection) queueDn(dn string) {
1✔
1061
        conn.indexMutex.Lock()
1✔
1062
        if conn.deltaQueue != nil {
2✔
1063
                conn.deltaQueue.Add(dn)
1✔
1064
        }
1✔
1065
        conn.indexMutex.Unlock()
1✔
1066
}
1067

1068
func (conn *ApicConnection) ForceRelogin() {
×
1069
        conn.token = ""
×
1070
}
×
1071

1072
func (conn *ApicConnection) PostDnInline(dn string, obj ApicObject) error {
×
1073
        conn.logger.WithFields(logrus.Fields{
×
1074
                "mod": "APICAPI",
×
1075
                "dn":  dn,
×
1076
                "obj": obj,
×
1077
        }).Debug("Posting Dn Inline")
×
1078
        if conn.token == "" {
×
1079
                token, err := conn.login()
×
1080
                if err != nil {
×
1081
                        conn.log.Errorf("Login: %v", err)
×
1082
                        return err
×
1083
                }
×
1084
                conn.token = token
×
1085
        }
1086
        uri := fmt.Sprintf("/api/mo/%s.json", dn)
×
1087
        url := fmt.Sprintf("https://%s%s", conn.Apic[conn.ApicIndex], uri)
×
1088
        raw, err := json.Marshal(obj)
×
1089
        if err != nil {
×
1090
                conn.log.Error("Could not serialize object for dn ", dn, ": ", err)
×
1091
                return err
×
1092
        }
×
1093
        req, err := http.NewRequest("POST", url, bytes.NewBuffer(raw))
×
1094
        if err != nil {
×
1095
                conn.log.Error("Could not create request: ", err)
×
1096
                return err
×
1097
        }
×
1098
        conn.sign(req, uri, raw)
×
1099
        req.Header.Set("Content-Type", "application/json")
×
1100
        conn.log.Infof("Post: %+v", req)
×
1101
        resp, err := conn.client.Do(req)
×
1102
        if err != nil {
×
1103
                conn.log.Error("Could not update dn ", dn, ": ", err)
×
1104
                return err
×
1105
        }
×
1106

1107
        complete(resp)
×
1108
        if resp.StatusCode != http.StatusOK {
×
1109
                return fmt.Errorf("Status: %v", resp.StatusCode)
×
1110
        }
×
1111
        return nil
×
1112
}
1113

1114
func (conn *ApicConnection) DeleteDnInline(dn string) error {
1✔
1115
        conn.logger.WithFields(logrus.Fields{
1✔
1116
                "mod": "APICAPI",
1✔
1117
                "dn":  dn,
1✔
1118
        }).Debug("Deleting Dn Inline")
1✔
1119
        uri := fmt.Sprintf("/api/mo/%s.json", dn)
1✔
1120
        url := fmt.Sprintf("https://%s%s", conn.Apic[conn.ApicIndex], uri)
1✔
1121
        req, err := http.NewRequest("DELETE", url, http.NoBody)
1✔
1122
        if err != nil {
1✔
1123
                conn.log.Error("Could not create delete request: ", err)
×
1124
                return err
×
1125
        }
×
1126
        conn.sign(req, uri, nil)
1✔
1127
        resp, err := conn.client.Do(req)
1✔
1128
        if err != nil {
2✔
1129
                conn.log.Error("Could not delete dn ", dn, ": ", err)
1✔
1130
                return err
1✔
1131
        }
1✔
1132
        defer complete(resp)
×
1133
        return nil
×
1134
}
1135

1136
func (conn *ApicConnection) postDn(dn string, obj ApicObject) bool {
1✔
1137
        conn.logger.WithFields(logrus.Fields{
1✔
1138
                "mod": "APICAPI",
1✔
1139
                "dn":  dn,
1✔
1140
                "obj": obj,
1✔
1141
        }).Debug("Posting Dn")
1✔
1142

1✔
1143
        uri := fmt.Sprintf("/api/mo/%s.json", dn)
1✔
1144
        url := fmt.Sprintf("https://%s%s", conn.Apic[conn.ApicIndex], uri)
1✔
1145
        raw, err := json.Marshal(obj)
1✔
1146
        if err != nil {
1✔
1147
                conn.log.Error("Could not serialize object for dn ", dn, ": ", err)
×
1148
        }
×
1149
        req, err := http.NewRequest("POST", url, bytes.NewBuffer(raw))
1✔
1150
        if err != nil {
1✔
1151
                conn.log.Error("Could not create request: ", err)
×
1152
                conn.restart()
×
1153
                return false
×
1154
        }
×
1155
        conn.sign(req, uri, raw)
1✔
1156
        req.Header.Set("Content-Type", "application/json")
1✔
1157
        resp, err := conn.client.Do(req)
1✔
1158
        if err != nil {
1✔
1159
                conn.log.Error("Could not update dn ", dn, ": ", err)
×
1160
                conn.restart()
×
1161
                return false
×
1162
        }
×
1163
        defer complete(resp)
1✔
1164
        if resp.StatusCode < 200 || resp.StatusCode >= 300 {
1✔
1165
                conn.logErrorResp("Could not update dn "+dn, resp)
×
1166
                if resp.StatusCode == 400 {
×
1167
                        return true
×
1168
                }
×
1169
                conn.restart()
×
1170
        }
1171
        return false
1✔
1172
}
1173

1174
func (conn *ApicConnection) Delete(dn string) bool {
1✔
1175
        if dn == "" {
1✔
1176
                conn.log.Debug("Skip delete for empty Dn: ")
×
1177
                return false
×
1178
        }
×
1179
        dnSlice := strings.Split(dn, "/")
1✔
1180
        identifier := dnSlice[len(dnSlice)-1]
1✔
1181
        iSlice := strings.SplitN(identifier, "-", 2)
1✔
1182
        if len(iSlice) == 2 {
2✔
1183
                if iSlice[0] == "ip" {
1✔
1184
                        addr := strings.Trim(iSlice[1], "[]")
×
1185
                        obj := NewDeleteHostprotRemoteIp(addr)
×
1186
                        conn.log.Debug("Posting delete of dn ", dn)
×
1187
                        return conn.postDn(dn, obj)
×
1188
                } else if iSlice[0] == "odev" {
1✔
1189
                        conn.log.Debug("Skipping delete of opflexODev : ", dn)
×
1190
                        return false
×
1191
                }
×
1192
        }
1193
        return conn.DeleteDn(dn)
1✔
1194
}
1195

1196
func (conn *ApicConnection) DeleteDn(dn string) bool {
1✔
1197
        if dn == "" {
1✔
1198
                conn.log.Debug("Skip delete for empty Dn: ")
×
1199
                return false
×
1200
        }
×
1201
        conn.logger.WithFields(logrus.Fields{
1✔
1202
                "mod": "APICAPI",
1✔
1203
                "dn":  dn,
1✔
1204
        }).Debug("Deleting Dn")
1✔
1205
        uri := fmt.Sprintf("/api/mo/%s.json", dn)
1✔
1206
        url := fmt.Sprintf("https://%s%s", conn.Apic[conn.ApicIndex], uri)
1✔
1207
        req, err := http.NewRequest("DELETE", url, http.NoBody)
1✔
1208
        if err != nil {
1✔
1209
                conn.log.Error("Could not create delete request: ", err)
×
1210
                conn.restart()
×
1211
                return false
×
1212
        }
×
1213
        conn.sign(req, uri, nil)
1✔
1214
        resp, err := conn.client.Do(req)
1✔
1215
        if err != nil {
1✔
1216
                conn.log.Error("Could not delete dn ", dn, ": ", err)
×
1217
                conn.restart()
×
1218
                return false
×
1219
        }
×
1220
        defer complete(resp)
1✔
1221
        if resp.StatusCode < 200 || resp.StatusCode >= 300 {
1✔
1222
                conn.logErrorResp("Could not delete dn "+dn, resp)
×
1223
                conn.restart()
×
1224
        }
×
1225
        return false
1✔
1226
}
1227

1228
func doComputeRespClasses(targetClasses []string,
1229
        visited map[string]bool) {
1✔
1230
        for _, class := range targetClasses {
2✔
1231
                if visited[class] {
1✔
1232
                        continue
×
1233
                }
1234
                visited[class] = true
1✔
1235
                if md, ok := metadata[class]; ok {
2✔
1236
                        doComputeRespClasses(md.children, visited)
1✔
1237
                }
1✔
1238
        }
1239
}
1240

1241
func computeRespClasses(targetClasses []string) []string {
1✔
1242
        visited := make(map[string]bool)
1✔
1243
        doComputeRespClasses(targetClasses, visited)
1✔
1244

1✔
1245
        // Don't include targetclasses in rsp-subtree
1✔
1246
        // because they are implicitly included
1✔
1247
        for i := range targetClasses {
2✔
1248
                delete(visited, targetClasses[i])
1✔
1249
        }
1✔
1250

1251
        var respClasses []string
1✔
1252
        for class := range visited {
2✔
1253
                respClasses = append(respClasses, class)
1✔
1254
        }
1✔
1255
        respClasses = append(respClasses, "tagAnnotation")
1✔
1256
        return respClasses
1✔
1257
}
1258

1259
// AddSubscriptionTree subscribe at a subtree level. class specifies
1260
// the root. Changes will cause entire subtree of the rootdn to be fetched
1261
func (conn *ApicConnection) AddSubscriptionTree(class string,
1262
        targetClasses []string, targetFilter string) {
1✔
1263
        if _, ok := classDepth[class]; !ok {
1✔
1264
                errStr := fmt.Sprintf("classDepth not for class %s", class)
×
1265
                panic(errStr)
×
1266
        }
1267

1268
        conn.indexMutex.Lock()
1✔
1269
        conn.subscriptions.subs[class] = &subscription{
1✔
1270
                kind:          apicSubTree,
1✔
1271
                childSubs:     make(map[string]subComponent),
1✔
1272
                targetClasses: targetClasses,
1✔
1273
                targetFilter:  targetFilter,
1✔
1274
        }
1✔
1275
        conn.indexMutex.Unlock()
1✔
1276
}
1277

1278
func (conn *ApicConnection) AddSubscriptionClass(class string,
1279
        targetClasses []string, targetFilter string) {
1✔
1280
        conn.indexMutex.Lock()
1✔
1281
        conn.subscriptions.subs[class] = &subscription{
1✔
1282
                kind:          apicSubClass,
1✔
1283
                childSubs:     make(map[string]subComponent),
1✔
1284
                targetClasses: targetClasses,
1✔
1285
                respClasses:   computeRespClasses(targetClasses),
1✔
1286
                targetFilter:  targetFilter,
1✔
1287
        }
1✔
1288
        conn.indexMutex.Unlock()
1✔
1289
}
1✔
1290

1291
func (conn *ApicConnection) AddSubscriptionDn(dn string,
1292
        targetClasses []string) {
1✔
1293
        conn.logger.WithFields(logrus.Fields{
1✔
1294
                "mod": "APICAPI",
1✔
1295
                "dn":  dn,
1✔
1296
        }).Debug("Adding Subscription for Dn")
1✔
1297

1✔
1298
        conn.indexMutex.Lock()
1✔
1299
        conn.subscriptions.subs[dn] = &subscription{
1✔
1300
                kind:          apicSubDn,
1✔
1301
                childSubs:     make(map[string]subComponent),
1✔
1302
                targetClasses: targetClasses,
1✔
1303
                respClasses:   computeRespClasses(targetClasses),
1✔
1304
        }
1✔
1305
        conn.indexMutex.Unlock()
1✔
1306
}
1✔
1307

1308
func (conn *ApicConnection) SetSubscriptionHooks(value string,
1309
        updateHook ApicObjectHandler, deleteHook ApicDnHandler) {
1✔
1310
        conn.indexMutex.Lock()
1✔
1311
        if s, ok := conn.subscriptions.subs[value]; ok {
2✔
1312
                s.updateHook = updateHook
1✔
1313
                s.deleteHook = deleteHook
1✔
1314
        }
1✔
1315
        conn.indexMutex.Unlock()
1✔
1316
}
1317

1318
func (conn *ApicConnection) GetApicResponse(uri string) (ApicResponse, error) {
1✔
1319
        conn.log.Debug("apicIndex: ", conn.Apic[conn.ApicIndex], " uri: ", uri)
1✔
1320
        url := fmt.Sprintf("https://%s%s", conn.Apic[conn.ApicIndex], uri)
1✔
1321
        var apicresp ApicResponse
1✔
1322
        conn.log.Debug("Apic Get url: ", url)
1✔
1323
        req, err := http.NewRequest("GET", url, http.NoBody)
1✔
1324
        if err != nil {
1✔
1325
                conn.log.Error("Could not create request: ", err)
×
1326
                return apicresp, err
×
1327
        }
×
1328
        conn.sign(req, uri, nil)
1✔
1329
        resp, err := conn.client.Do(req)
1✔
1330
        if err != nil {
1✔
1331
                conn.log.Error("Could not get response for ", url, ": ", err)
×
1332
                return apicresp, err
×
1333
        }
×
1334
        defer complete(resp)
1✔
1335
        if resp.StatusCode < 200 || resp.StatusCode >= 300 {
1✔
1336
                conn.logErrorResp("Could not get subtree for "+url, resp)
×
1337
                return apicresp, err
×
1338
        }
×
1339
        err = json.NewDecoder(resp.Body).Decode(&apicresp)
1✔
1340
        if err != nil {
1✔
1341
                conn.log.Error("Could not parse APIC response: ", err)
×
1342
                return apicresp, err
×
1343
        }
×
1344
        return apicresp, nil
1✔
1345
}
1346

1347
func (conn *ApicConnection) doSubscribe(args []string,
1348
        kind, value, refresh_interval string, apicresp *ApicResponse) bool {
1✔
1349
        // properly encoding the URI query parameters breaks APIC
1✔
1350
        uri := fmt.Sprintf("/api/%s/%s.json?subscription=yes&%s%s",
1✔
1351
                kind, value, refresh_interval, strings.Join(args, "&"))
1✔
1352
        url := fmt.Sprintf("https://%s%s", conn.Apic[conn.ApicIndex], uri)
1✔
1353
        conn.log.Info("APIC connection URL: ", url)
1✔
1354

1✔
1355
        req, err := http.NewRequest("GET", url, http.NoBody)
1✔
1356
        if err != nil {
1✔
1357
                conn.log.Error("Could not create request: ", err)
×
1358
                return false
×
1359
        }
×
1360
        conn.sign(req, uri, nil)
1✔
1361
        resp, err := conn.client.Do(req)
1✔
1362
        if err != nil {
1✔
1363
                conn.log.Error("Failed to subscribe to ", value, ": ", err)
×
1364
                return false
×
1365
        }
×
1366
        defer complete(resp)
1✔
1367
        if resp.StatusCode < 200 || resp.StatusCode >= 300 {
1✔
1368
                conn.logErrorResp("Could not subscribe to "+value, resp)
×
1369
                return false
×
1370
        }
×
1371

1372
        err = json.NewDecoder(resp.Body).Decode(apicresp)
1✔
1373
        if err != nil {
1✔
1374
                conn.log.Error("Could not decode APIC response", err)
×
1375
                return false
×
1376
        }
×
1377
        time.Sleep(conn.SubscriptionDelay)
1✔
1378
        return true
1✔
1379
}
1380

1381
func (conn *ApicConnection) subscribe(value string, sub *subscription, lockHeld bool) bool {
1✔
1382
        baseArgs := []string{
1✔
1383
                "query-target=subtree",
1✔
1384
                "rsp-subtree=full",
1✔
1385
                "target-subtree-class=" + strings.Join(sub.targetClasses, ","),
1✔
1386
        }
1✔
1387

1✔
1388
        const defaultArgs = 1
1✔
1389
        var argCount = defaultArgs
1✔
1390
        var combinableSubClasses, separableSubClasses []string
1✔
1391
        var splitTargetClasses [][]string
1✔
1392
        var splitRespClasses [][]string
1✔
1393
        var argSet [][]string
1✔
1394
        argSet = make([][]string, defaultArgs)
1✔
1395
        argSet[defaultArgs-1] = make([]string, len(baseArgs))
1✔
1396
        copy(argSet[defaultArgs-1], baseArgs)
1✔
1397
        if sub.respClasses != nil {
2✔
1398
                separateClasses := func(classes []string, combClasses, sepClasses *[]string) {
2✔
1399
                        for i := range classes {
2✔
1400
                                if classMeta, ok := metadata[classes[i]]; ok {
2✔
1401
                                        if classes[i] == "tagAnnotation" {
2✔
1402
                                                continue
1✔
1403
                                        }
1404
                                        if classMeta.hints != nil && classMeta.hints["cardinality"] == "high" {
2✔
1405
                                                *sepClasses = append(*sepClasses, classes[i])
1✔
1406
                                                continue
1✔
1407
                                        }
1408
                                        *combClasses = append(*combClasses, classes[i])
1✔
1409
                                }
1410
                        }
1411
                }
1412
                separateClasses(sub.respClasses, &combinableSubClasses, &separableSubClasses)
1✔
1413

1✔
1414
                // In case there are high cardinality children, we register for all the classes individually.
1✔
1415
                // The concept of target-subtree and rsp-subtree class cannot be used because of the tagAnnotation object
1✔
1416
                // vmmInjectedLabel is added for every object, so getting it separately will not be scalable
1✔
1417
                if len(separableSubClasses) > 0 {
2✔
1418
                        separateClasses(sub.targetClasses, &combinableSubClasses, &separableSubClasses)
1✔
1419
                        separableSubClasses = append(separableSubClasses, combinableSubClasses...)
1✔
1420
                        baseArgs = []string{
1✔
1421
                                "query-target=subtree",
1✔
1422
                                "rsp-subtree=children",
1✔
1423
                        }
1✔
1424
                        subscribingClasses := make(map[string]bool)
1✔
1425
                        argSet = make([][]string, len(separableSubClasses))
1✔
1426
                        splitTargetClasses = make([][]string, len(separableSubClasses))
1✔
1427
                        splitRespClasses = make([][]string, len(separableSubClasses))
1✔
1428

1✔
1429
                        argCount = 0
1✔
1430
                        for i := range separableSubClasses {
2✔
1431
                                // Eliminate duplicates
1✔
1432
                                if _, ok := subscribingClasses[separableSubClasses[i]]; ok {
1✔
1433
                                        continue
×
1434
                                }
1435
                                subscribingClasses[separableSubClasses[i]] = true
1✔
1436
                                argSet[argCount] = make([]string, len(baseArgs))
1✔
1437
                                copy(argSet[argCount], baseArgs)
1✔
1438
                                argSet[argCount] = append(argSet[argCount], "target-subtree-class="+separableSubClasses[i], "rsp-subtree-class=tagAnnotation")
1✔
1439
                                splitTargetClasses[argCount] = append(splitTargetClasses[argCount], separableSubClasses[i])
1✔
1440
                                splitRespClasses[argCount] = computeRespClasses([]string{separableSubClasses[i]})
1✔
1441
                                argCount++
1✔
1442
                        }
1443
                } else {
1✔
1444
                        argSet[defaultArgs-1] = append(argSet[defaultArgs-1], "rsp-subtree-class="+strings.Join(combinableSubClasses, ",")+",tagAnnotation")
1✔
1445
                }
1✔
1446
        }
1447
        if sub.targetFilter != "" {
2✔
1448
                targetFilterArgs := "query-target-filter=" + sub.targetFilter
1✔
1449
                if len(separableSubClasses) == 0 {
2✔
1450
                        argSet[defaultArgs-1] = append(argSet[defaultArgs-1], targetFilterArgs)
1✔
1451
                } else {
1✔
1452
                        for i := 0; i < argCount; i++ {
×
1453
                                argSet[i] = append(argSet[i], targetFilterArgs)
×
1454
                        }
×
1455
                }
1456
        }
1457

1458
        kind := "mo"
1✔
1459
        if sub.kind == apicSubClass || sub.kind == apicSubTree {
2✔
1460
                kind = "class"
1✔
1461
        }
1✔
1462

1463
        refresh_interval := ""
1✔
1464
        if conn.RefreshInterval != 0 {
2✔
1465
                refresh_interval = fmt.Sprintf("refresh-timeout=%v&",
1✔
1466
                        conn.RefreshInterval.Seconds())
1✔
1467
        }
1✔
1468
        for i := 0; i < argCount; i++ {
2✔
1469
                var apicresp ApicResponse
1✔
1470
                if !conn.doSubscribe(argSet[i], kind, value, refresh_interval, &apicresp) {
1✔
1471
                        return false
×
1472
                }
×
1473
                subId, ok := apicresp.SubscriptionId.(string)
1✔
1474
                if !ok {
1✔
1475
                        conn.log.Error("Subscription ID is not a string")
×
1476
                        return false
×
1477
                }
×
1478

1479
                conn.logger.WithFields(logrus.Fields{
1✔
1480
                        "mod":   "APICAPI",
1✔
1481
                        "value": value,
1✔
1482
                        "kind":  kind,
1✔
1483
                        "id":    subId,
1✔
1484
                        "args":  argSet[i],
1✔
1485
                }).Debug("Subscribed")
1✔
1486
                if !lockHeld {
2✔
1487
                        conn.indexMutex.Lock()
1✔
1488
                }
1✔
1489
                if argCount > defaultArgs {
2✔
1490
                        sub.childSubs[subId] = subComponent{
1✔
1491
                                targetClasses: splitTargetClasses[i],
1✔
1492
                                respClasses:   splitRespClasses[i],
1✔
1493
                        }
1✔
1494
                } else {
2✔
1495
                        conn.subscriptions.subs[value].id = subId
1✔
1496
                }
1✔
1497
                conn.subscriptions.ids[subId] = value
1✔
1498
                if !lockHeld {
2✔
1499
                        conn.indexMutex.Unlock()
1✔
1500
                }
1✔
1501
                var respObjCount int
1✔
1502
                for _, obj := range apicresp.Imdata {
2✔
1503
                        dn := obj.GetDn()
1✔
1504
                        if dn == "" {
1✔
1505
                                continue
×
1506
                        }
1507
                        if !lockHeld {
2✔
1508
                                conn.indexMutex.Lock()
1✔
1509
                        }
1✔
1510
                        subIds, found := conn.cacheDnSubIds[dn]
1✔
1511
                        if !found {
2✔
1512
                                subIds = make(map[string]bool)
1✔
1513
                                conn.cacheDnSubIds[dn] = subIds
1✔
1514
                        }
1✔
1515
                        subIds[subId] = true
1✔
1516
                        if !lockHeld {
2✔
1517
                                conn.indexMutex.Unlock()
1✔
1518
                        }
1✔
1519
                        if sub.updateHook != nil && sub.updateHook(obj) {
2✔
1520
                                continue
1✔
1521
                        }
1522

1523
                        tag := obj.GetTag()
1✔
1524
                        if !conn.isSyncTag(tag) {
1✔
1525
                                continue
×
1526
                        }
1527

1528
                        conn.logger.WithFields(logrus.Fields{
1✔
1529
                                "mod": "APICAPI",
1✔
1530
                                "dn":  dn,
1✔
1531
                                "tag": tag,
1✔
1532
                                "obj": obj,
1✔
1533
                        }).Debug("Caching")
1✔
1534
                        var count int
1✔
1535
                        prepareApicCache("", obj, &count)
1✔
1536
                        respObjCount += count
1✔
1537
                        if !lockHeld {
2✔
1538
                                conn.indexMutex.Lock()
1✔
1539
                        }
1✔
1540
                        conn.cachedState[tag] = append(conn.cachedState[tag], obj)
1✔
1541
                        if !lockHeld {
2✔
1542
                                conn.indexMutex.Unlock()
1✔
1543
                        }
1✔
1544
                }
1545
                if respObjCount >= ApicSubscriptionResponseMoMaxCount/10 {
1✔
1546
                        conn.logger.WithFields(logrus.Fields{
×
1547
                                "args":       argSet[i],
×
1548
                                "moCount":    respObjCount,
×
1549
                                "maxAllowed": ApicSubscriptionResponseMoMaxCount,
×
1550
                        }).Warning("Subscription response is significantly large. Each new object will add 2 Mos atleast and twice the number of labels on the object")
×
1551
                } else {
1✔
1552
                        conn.logger.WithFields(logrus.Fields{
1✔
1553
                                "moCount": respObjCount,
1✔
1554
                        }).Debug("ResponseObjCount")
1✔
1555
                }
1✔
1556
        }
1557

1558
        return true
1✔
1559
}
1560

1561
var tagRegexp = regexp.MustCompile(`[a-zA-Z0-9_]{1,31}-[a-f0-9]{32}`)
1562

1563
func (conn *ApicConnection) isSyncTag(tag string) bool {
1✔
1564
        return tagRegexp.MatchString(tag) &&
1✔
1565
                strings.HasPrefix(tag, conn.prefix+"-")
1✔
1566
}
1✔
1567

1568
func getRootDn(dn, rootClass string) string {
1✔
1569
        depth := classDepth[rootClass]
1✔
1570
        parts := strings.Split(dn, "/")
1✔
1571
        parts = parts[:depth]
1✔
1572
        return strings.Join(parts, "/")
1✔
1573
}
1✔
1574

1575
func (conn *ApicConnection) PostApicObjects(uri string, payload ApicSlice) error {
×
1576
        conn.log.Debug("apicIndex: ", conn.Apic[conn.ApicIndex], " uri: ", uri)
×
1577
        url := fmt.Sprintf("https://%s%s", conn.Apic[conn.ApicIndex], uri)
×
1578
        conn.log.Debug("Apic POST url: ", url)
×
1579

×
1580
        if conn.token == "" {
×
1581
                token, err := conn.login()
×
1582
                if err != nil {
×
1583
                        conn.log.Errorf("Login: %v", err)
×
1584
                        return err
×
1585
                }
×
1586
                conn.token = token
×
1587
        }
1588

1589
        raw, err := json.Marshal(payload)
×
1590
        if err != nil {
×
1591
                conn.log.Error("Could not serialize object: ", err)
×
1592
                return err
×
1593
        }
×
1594

1595
        req, err := http.NewRequest("POST", url, bytes.NewBuffer(raw))
×
1596
        if err != nil {
×
1597
                conn.log.Error("Could not create request: ", err)
×
1598
                return err
×
1599
        }
×
1600

1601
        conn.sign(req, uri, raw)
×
1602
        req.Header.Set("Content-Type", "application/json")
×
1603
        conn.log.Infof("Post: %+v", req)
×
1604
        resp, err := conn.client.Do(req)
×
1605
        if err != nil {
×
1606
                conn.log.Error("Could not update  ", url, ": ", err)
×
1607
                return err
×
1608
        }
×
1609

1610
        complete(resp)
×
1611

×
1612
        if resp.StatusCode != http.StatusOK {
×
1613
                return fmt.Errorf("Status: %v", resp.StatusCode)
×
1614
        }
×
1615

1616
        return nil
×
1617
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc