• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

noironetworks / aci-containers / 11995

29 May 2026 05:52AM UTC coverage: 63.172% (-0.04%) from 63.215%
11995

Pull #1736

travis-pro

web-flow
Merge e7bf1ba4e into 1654f3a56
Pull Request #1736: Bump go version to 1.26.3 in .travis.yml

13318 of 21082 relevant lines covered (63.17%)

0.72 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

63.51
/pkg/apicapi/apicapi.go
1
// Copyright 2017 Cisco Systems, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14

15
// Interface for connecting to APIC REST API using websockets
16
package apicapi
17

18
import (
19
        "bytes"
20
        "crypto/tls"
21
        "crypto/x509"
22
        "encoding/json"
23
        "errors"
24
        "fmt"
25
        "io"
26
        "math/rand"
27
        "net/http"
28
        "net/http/cookiejar"
29
        "regexp"
30
        "sort"
31
        "strconv"
32
        "strings"
33
        "time"
34

35
        "k8s.io/apimachinery/pkg/util/wait"
36
        "k8s.io/client-go/util/workqueue"
37

38
        "github.com/gorilla/websocket"
39
        "github.com/noironetworks/aci-containers/pkg/util"
40
        "github.com/sirupsen/logrus"
41
        "golang.org/x/time/rate"
42
)
43

44
const (
45
        // defaultConnectionRefresh is used as connection refresh interval if
46
        // RefreshInterval is set to 0
47
        defaultConnectionRefresh = 30 * time.Second
48
        cacheStaleAfterTime      = 60 * time.Minute
49
        maxRequestRetry          = 5
50
)
51

52
// ApicVersion - This global variable to be used when dealing with version-
53
// dependencies during APIC interaction. It gets filled with actual version
54
// as part of runConn()
55
var (
56
        ApicVersion = "3.1"
57
)
58

59
func complete(resp *http.Response) {
1✔
60
        if resp.StatusCode != http.StatusOK {
2✔
61
                rBody, err := io.ReadAll(resp.Body)
1✔
62
                if err != nil {
1✔
63
                        logrus.Errorf("ReadAll :%v", err)
×
64
                } else {
1✔
65
                        logrus.Infof("Resp: %s", rBody)
1✔
66
                }
1✔
67
        }
68
        resp.Body.Close()
1✔
69
}
70

71
// Yes, this is really stupid, but this is really how this works
72
func (conn *ApicConnection) sign(req *http.Request, uri string, body []byte) {
1✔
73
        if conn.signer == nil {
2✔
74
                return
1✔
75
        }
1✔
76

77
        sig, err := conn.signer.sign(req.Method, uri, body)
1✔
78
        if err != nil {
1✔
79
                conn.log.Error("Failed to sign request: ", err)
×
80
                return
×
81
        }
×
82

83
        req.Header.Set("Cookie", conn.apicSigCookie(sig, conn.token))
1✔
84
}
85

86
func (conn *ApicConnection) apicSigCookie(sig, token string) string {
1✔
87
        tokc := ""
1✔
88
        if token != "" {
2✔
89
                tokc = "; APIC-WebSocket-Session=" + token
1✔
90
        }
1✔
91
        return fmt.Sprintf("APIC-Request-Signature=%s; "+
1✔
92
                "APIC-Certificate-Algorithm=v1.0; "+
1✔
93
                "APIC-Certificate-DN=uni/userext/user-%s/usercert-%s.crt; "+
1✔
94
                "APIC-Certificate-Fingerprint=fingerprint%s",
1✔
95
                sig, conn.user, conn.user, tokc)
1✔
96
}
97

98
func (conn *ApicConnection) login() (string, error) {
1✔
99
        var path string
1✔
100
        var method string
1✔
101

1✔
102
        if conn.signer == nil {
2✔
103
                path = "aaaLogin"
1✔
104
                method = "POST"
1✔
105
        } else {
2✔
106
                path = "webtokenSession"
1✔
107
                method = "GET"
1✔
108
        }
1✔
109
        uri := fmt.Sprintf("/api/%s.json", path)
1✔
110
        var raw []byte
1✔
111
        var err error
1✔
112
        if conn.signer == nil {
2✔
113
                login := &ApicObject{
1✔
114
                        "aaaUser": &ApicObjectBody{
1✔
115
                                Attributes: map[string]interface{}{
1✔
116
                                        "name": conn.user,
1✔
117
                                        "pwd":  conn.password,
1✔
118
                                },
1✔
119
                        },
1✔
120
                }
1✔
121
                raw, err = json.Marshal(login)
1✔
122
                if err != nil {
1✔
123
                        return "", err
×
124
                }
×
125
        }
126
        conn.log.Infof("Req: %+v", &struct{ Method, URI string }{method, uri})
1✔
127
        resp, err := conn.sendHTTPSRequestToAPIC(method, uri, raw, "application/json", false, nil)
1✔
128
        if err != nil {
2✔
129
                return "", err
1✔
130
        }
1✔
131
        defer complete(resp)
1✔
132

1✔
133
        var apicresp ApicResponse
1✔
134
        err = json.NewDecoder(resp.Body).Decode(&apicresp)
1✔
135
        if err != nil {
1✔
136
                return "", err
×
137
        }
×
138

139
        for _, obj := range apicresp.Imdata {
2✔
140
                lresp, ok := obj["aaaLogin"]
1✔
141
                if !ok {
2✔
142
                        lresp, ok = obj["webtokenSession"]
1✔
143
                        if !ok {
1✔
144
                                continue
×
145
                        }
146
                }
147

148
                token, ok := lresp.Attributes["token"]
1✔
149
                if !ok {
1✔
150
                        return "", errors.New("Token not found in login response")
×
151
                }
×
152
                stoken, isStr := token.(string)
1✔
153
                if !isStr {
1✔
154
                        return "", errors.New("Token is not a string")
×
155
                }
×
156
                return stoken, nil
1✔
157
        }
158
        return "", errors.New("Login response not found")
×
159
}
160

161
func configureTls(cert []byte) (*tls.Config, error) {
1✔
162
        if cert == nil {
1✔
163
                return &tls.Config{InsecureSkipVerify: true}, nil
×
164
        }
×
165
        pool := x509.NewCertPool()
1✔
166
        if !pool.AppendCertsFromPEM(cert) {
1✔
167
                return nil, errors.New("Could not load CA certificates")
×
168
        }
×
169
        return &tls.Config{RootCAs: pool}, nil
1✔
170
}
171

172
func New(log *logrus.Logger, apic []string, user string,
173
        password string, privKey []byte, cert []byte,
174
        prefix string, refresh int, refreshTickerAdjust int,
175
        leafRebootCheckInterval int, subscriptionDelay int, vrfTenant string,
176
        lldpIfHldr func(dn, lldpIf string) bool, cnoEnabled bool) (*ApicConnection, error) {
1✔
177
        tls, err := configureTls(cert)
1✔
178
        if err != nil {
1✔
179
                return nil, err
×
180
        }
×
181

182
        var signer *signer
1✔
183
        if privKey != nil {
2✔
184
                signer, err = newSigner(privKey)
1✔
185
                if err != nil {
1✔
186
                        return nil, err
×
187
                }
×
188
        }
189

190
        dialer := &websocket.Dialer{
1✔
191
                TLSClientConfig: tls,
1✔
192
        }
1✔
193
        tr := &http.Transport{
1✔
194
                Proxy:           http.ProxyFromEnvironment,
1✔
195
                TLSClientConfig: dialer.TLSClientConfig,
1✔
196
        }
1✔
197
        jar, err := cookiejar.New(nil)
1✔
198
        if err != nil {
1✔
199
                return nil, err
×
200
        }
×
201
        client := &http.Client{
1✔
202
                Transport: tr,
1✔
203
                Jar:       jar,
1✔
204
                Timeout:   5 * time.Minute,
1✔
205
        }
1✔
206

1✔
207
        // When prefix is very long, AciNameForKey produces a pure hash with no
1✔
208
        // prefix embedded, making DN-based prefix filtering impossible.
1✔
209
        prefixFilterable := len(prefix) <= util.AciPrefixMaxLen
1✔
210

1✔
211
        conn := &ApicConnection{
1✔
212
                ReconnectInterval:       time.Duration(5) * time.Second,
1✔
213
                ReconnectRetryLimit:     5,
1✔
214
                RefreshInterval:         time.Duration(refresh) * time.Second,
1✔
215
                LeafRebootCheckInterval: time.Duration(leafRebootCheckInterval) * time.Second,
1✔
216
                RefreshTickerAdjust:     time.Duration(refreshTickerAdjust) * time.Second,
1✔
217
                SubscriptionDelay:       time.Duration(subscriptionDelay) * time.Millisecond,
1✔
218

1✔
219
                SyncDone:         false,
1✔
220
                signer:           signer,
1✔
221
                dialer:           dialer,
1✔
222
                logger:           log,
1✔
223
                log:              log.WithField("mod", "APICAPI"),
1✔
224
                Apic:             apic,
1✔
225
                user:             user,
1✔
226
                password:         password,
1✔
227
                prefix:           prefix,
1✔
228
                prefixFilterable: prefixFilterable,
1✔
229
                client:           client,
1✔
230
                vrfTenant:        vrfTenant,
1✔
231
                lldpIfHldr:       lldpIfHldr,
1✔
232
                cnoEnabled:       cnoEnabled,
1✔
233
                subscriptions: subIndex{
1✔
234
                        subs: make(map[string]*subscription),
1✔
235
                        ids:  make(map[string]string),
1✔
236
                },
1✔
237
                syncStore:          make(map[string]*syncObj),
1✔
238
                cacheOpflexOdev:    make(map[string]struct{}),
1✔
239
                desiredState:       make(map[string]ApicSlice),
1✔
240
                desiredStateDn:     make(map[string]ApicObject),
1✔
241
                keyHashes:          make(map[string]string),
1✔
242
                containerDns:       make(map[string]bool),
1✔
243
                cachedState:        make(map[string]ApicSlice),
1✔
244
                cacheDnSubIds:      make(map[string]map[string]bool),
1✔
245
                pendingSubDnUpdate: make(map[string]pendingChange),
1✔
246
                CachedSubnetDns:    make(map[string]string),
1✔
247
                cachedLLDPIfs:      make(map[string]string),
1✔
248
                cachedLeafDns:      make(map[string]*leafCacheEntry),
1✔
249
        }
1✔
250
        return conn, nil
1✔
251
}
252

253
func (conn *ApicConnection) addToQueue(queue workqueue.RateLimitingInterface, item interface{}) {
1✔
254
        if conn.cnoEnabled || conn.DisableRateLimit {
1✔
255
                queue.Add(item)
×
256
        } else {
1✔
257
                queue.AddRateLimited(item)
1✔
258
        }
1✔
259
}
260

261
func (conn *ApicConnection) handleSocketUpdate(apicresp *ApicResponse) {
1✔
262
        var subIds []string
1✔
263
        switch ids := apicresp.SubscriptionId.(type) {
1✔
264
        case string:
×
265
                subIds = append(subIds, ids)
×
266
        case []interface{}:
1✔
267
                for _, id := range ids {
2✔
268
                        subIds = append(subIds, id.(string))
1✔
269
                }
1✔
270
        }
271

272
        // Check if any subscription in this message has hooks (updateHook/deleteHook).
273
        // Subscriptions with hooks handle all notifications regardless of ownership
274
        // (e.g., fvSubnet class subscription tracking subnets across all BDs).
275
        // The prefix filter must be bypassed for these.
276
        hasHooks := false
1✔
277
        if !conn.cnoEnabled && conn.prefixFilterable && conn.vrfTenant != "" {
2✔
278
                conn.indexMutex.Lock()
1✔
279
                for _, id := range subIds {
2✔
280
                        if value, ok := conn.subscriptions.ids[id]; ok {
2✔
281
                                if sub, ok := conn.subscriptions.subs[value]; ok {
2✔
282
                                        if sub.updateHook != nil || sub.deleteHook != nil {
2✔
283
                                                hasHooks = true
1✔
284
                                                break
1✔
285
                                        }
286
                                }
287
                        }
288
                }
289
                conn.indexMutex.Unlock()
1✔
290
        }
291

292
        for _, obj := range apicresp.Imdata {
2✔
293
                for key, body := range obj {
2✔
294
                        if dn, ok := body.Attributes["dn"].(string); ok {
2✔
295
                                if status, isStr := body.Attributes["status"].(string); isStr {
2✔
296
                                        dnSlice := strings.Split(dn, "/")
1✔
297
                                        // Filter out notifications from other controllers sharing the same vrfTenant.
1✔
298
                                        // In normal mode, all MOs created by this controller have names generated via
1✔
299
                                        // AciNameForKey(prefix, ktype, key) which always produces "{prefix}_{ktype}_{...}",
1✔
300
                                        // so the DN will contain "{prefix}_" in one of its segments. Skip notifications
1✔
301
                                        // under our vrfTenant that don't carry our prefix — they belong to other controllers.
1✔
302
                                        // This filter is bypassed in CNO/chained mode, when AciNameForKey produces
1✔
303
                                        // pure-hash names (prefixFilterable is false), and when the subscription has
1✔
304
                                        // hooks that need to process all notifications regardless of ownership.
1✔
305
                                        if !hasHooks && !conn.cnoEnabled && conn.prefixFilterable &&
1✔
306
                                                len(dnSlice) > 1 && conn.vrfTenant != "" &&
1✔
307
                                                dnSlice[1] == "tn-"+conn.vrfTenant &&
1✔
308
                                                !strings.Contains(dn, "-"+conn.prefix+"_") {
1✔
309
                                                conn.log.Debug("Skipping websocket notification for dn: ", dn, " class: ", key)
×
310
                                                continue
×
311
                                        }
312
                                        var pendingKind int
1✔
313
                                        if status == "deleted" {
2✔
314
                                                pendingKind = pendingChangeDelete
1✔
315
                                        } else {
2✔
316
                                                pendingKind = pendingChangeUpdate
1✔
317
                                        }
1✔
318
                                        conn.indexMutex.Lock()
1✔
319

1✔
320
                                        conn.logger.WithFields(logrus.Fields{
1✔
321
                                                "mod": "APICAPI",
1✔
322
                                                "dn":  obj.GetDn(),
1✔
323
                                                "obj": obj,
1✔
324
                                        }).Debug("Processing websocket notification for:")
1✔
325

1✔
326
                                        conn.pendingSubDnUpdate[dn] = pendingChange{
1✔
327
                                                kind:    pendingKind,
1✔
328
                                                subIds:  subIds,
1✔
329
                                                isDirty: false,
1✔
330
                                        }
1✔
331
                                        if key == "opflexODev" && conn.odevQueue != nil {
2✔
332
                                                if conn.FilterOpflexDevice {
1✔
333
                                                        if conn.isPresentInOpflexOdevCache(dn, status, obj) {
×
334
                                                                conn.log.Info("Adding dn to odevQueue: ", dn)
×
335
                                                                conn.addToQueue(conn.odevQueue, dn)
×
336
                                                        }
×
337
                                                } else {
1✔
338
                                                        conn.log.Info("Adding dn to odevQueue: ", dn)
1✔
339
                                                        conn.addToQueue(conn.odevQueue, dn)
1✔
340
                                                }
1✔
341
                                        } else if isPriorityObject(dn) {
1✔
342
                                                conn.log.Debug("Adding dn to priorityQueue: ", dn)
×
343
                                                conn.addToQueue(conn.priorityQueue, dn)
×
344
                                        } else if isLLDPIfObject(dn) && conn.lldpIfQueue != nil {
1✔
345
                                                if lldpIf, ok := body.Attributes["portDesc"].(string); ok {
×
346
                                                        conn.cachedLLDPIfs[dn] = lldpIf
×
347
                                                }
×
348
                                                conn.log.Debug("Adding dn to lldpIfQueue: ", dn)
×
349
                                                conn.addToQueue(conn.lldpIfQueue, dn)
×
350
                                        } else if conn.deltaQueue != nil {
2✔
351
                                                conn.addToQueue(conn.deltaQueue, dn)
1✔
352
                                        }
1✔
353
                                        conn.indexMutex.Unlock()
1✔
354
                                }
355
                        }
356
                }
357
        }
358
}
359

360
func (conn *ApicConnection) restart() {
1✔
361
        conn.indexMutex.Lock()
1✔
362
        if conn.restartCh != nil {
2✔
363
                conn.log.Debug("Restarting connection")
1✔
364
                close(conn.restartCh)
1✔
365
                conn.restartCh = nil
1✔
366
        }
1✔
367
        conn.indexMutex.Unlock()
1✔
368
}
369

370
func (conn *ApicConnection) handleQueuedDn(dn string) bool {
1✔
371
        var respClasses []string
1✔
372
        var updateHandlers []ApicObjectHandler
1✔
373
        var deleteHandlers []ApicDnHandler
1✔
374
        var rootDn string
1✔
375

1✔
376
        handleId := func(id string) {
2✔
377
                conn.indexMutex.Lock()
1✔
378
                if value, ok := conn.subscriptions.ids[id]; ok {
2✔
379
                        if sub, ok := conn.subscriptions.subs[value]; ok {
2✔
380
                                if subComp, ok := sub.childSubs[id]; ok {
1✔
381
                                        respClasses =
×
382
                                                append(respClasses, subComp.respClasses...)
×
383
                                } else {
1✔
384
                                        respClasses =
1✔
385
                                                append(respClasses, sub.respClasses...)
1✔
386
                                }
1✔
387
                                if sub.updateHook != nil {
2✔
388
                                        updateHandlers = append(updateHandlers, sub.updateHook)
1✔
389
                                }
1✔
390
                                if sub.deleteHook != nil {
2✔
391
                                        deleteHandlers = append(deleteHandlers, sub.deleteHook)
1✔
392
                                }
1✔
393

394
                                if sub.kind == apicSubTree {
1✔
395
                                        rootDn = getRootDn(dn, value)
×
396
                                }
×
397
                        }
398
                } else {
×
399
                        conn.log.Warning("Unexpected subscription: ", id)
×
400
                }
×
401
                conn.indexMutex.Unlock()
1✔
402
        }
403

404
        var requeue bool
1✔
405
        conn.indexMutex.Lock()
1✔
406
        pending, hasPendingChange := conn.pendingSubDnUpdate[dn]
1✔
407
        conn.pendingSubDnUpdate[dn] = pendingChange{isDirty: true}
1✔
408
        obj, hasDesiredState := conn.desiredStateDn[dn]
1✔
409
        conn.indexMutex.Unlock()
1✔
410

1✔
411
        if hasPendingChange {
2✔
412
                for _, id := range pending.subIds {
2✔
413
                        handleId(id)
1✔
414
                }
1✔
415
        }
416

417
        if rootDn == "" {
2✔
418
                rootDn = dn
1✔
419
        }
1✔
420

421
        if hasDesiredState {
2✔
422
                if hasPendingChange {
2✔
423
                        if pending.kind == pendingChangeDelete {
2✔
424
                                conn.logger.WithFields(logrus.Fields{"mod": "APICAPI", "DN": dn}).
1✔
425
                                        Warning("Restoring unexpectedly deleted" +
1✔
426
                                                " ACI object")
1✔
427
                                requeue = conn.postDn(dn, obj)
1✔
428
                        } else {
2✔
429
                                conn.log.Debug("getSubtreeDn for:", rootDn)
1✔
430
                                conn.getSubtreeDn(rootDn, respClasses, updateHandlers)
1✔
431
                        }
1✔
432
                } else {
1✔
433
                        requeue = conn.postDn(dn, obj)
1✔
434
                }
1✔
435
        } else {
1✔
436
                if hasPendingChange {
2✔
437
                        if pending.kind == pendingChangeSyncDelete {
2✔
438
                                // Ownership already verified by sync/reconcile;
1✔
439
                                // delete without an extra APIC lookup.
1✔
440
                                requeue = conn.Delete(dn)
1✔
441
                        } else {
2✔
442
                                if pending.kind == pendingChangeDelete {
2✔
443
                                        for _, handler := range deleteHandlers {
2✔
444
                                                handler(dn)
1✔
445
                                        }
1✔
446
                                }
447

448
                                if (pending.kind != pendingChangeDelete) || (dn != rootDn) {
2✔
449
                                        conn.log.Debug("getSubtreeDn for:", rootDn)
1✔
450
                                        conn.getSubtreeDn(rootDn, respClasses, updateHandlers)
1✔
451
                                }
1✔
452
                        }
453
                } else {
×
454
                        if conn.isOwnedByController(dn) {
×
455
                                requeue = conn.Delete(dn)
×
456
                        } else {
×
457
                                conn.log.Debug("Skipping delete for unowned dn: ", dn)
×
458
                        }
×
459
                }
460
        }
461

462
        return requeue
1✔
463
}
464

465
func (conn *ApicConnection) processQueue(queue workqueue.RateLimitingInterface,
466
        queueStop <-chan struct{}, name string) {
1✔
467
        go wait.Until(func() {
2✔
468
                conn.log.Debug("Running processQueue for queue ", name)
1✔
469
                for {
2✔
470
                        dn, quit := queue.Get()
1✔
471
                        if quit {
2✔
472
                                break
1✔
473
                        }
474
                        conn.log.Debug("Processing queue for:", dn)
1✔
475
                        var requeue bool
1✔
476
                        if dn, ok := dn.(string); ok {
2✔
477
                                requeue = conn.handleQueuedDn(dn)
1✔
478
                        }
1✔
479
                        if requeue {
1✔
480
                                queue.AddRateLimited(dn)
×
481
                        } else {
1✔
482
                                conn.indexMutex.Lock()
1✔
483
                                if conn.pendingSubDnUpdate[dn.(string)].isDirty {
2✔
484
                                        delete(conn.pendingSubDnUpdate, dn.(string))
1✔
485
                                }
1✔
486
                                conn.indexMutex.Unlock()
1✔
487
                                queue.Forget(dn)
1✔
488
                        }
489
                        queue.Done(dn)
1✔
490
                }
491
        }, time.Second, queueStop)
492
        <-queueStop
1✔
493
        queue.ShutDown()
1✔
494
}
495

496
func (conn *ApicConnection) processLLDPIfQueue(queue workqueue.RateLimitingInterface,
497
        handler func(dn, lldpIf string) bool, stopCh <-chan struct{}) {
1✔
498
        go wait.Until(func() {
2✔
499
                for {
2✔
500
                        key, quit := queue.Get()
1✔
501
                        if quit {
2✔
502
                                break
1✔
503
                        }
504
                        var requeue bool
×
505
                        switch key := key.(type) {
×
506
                        case chan struct{}:
×
507
                                close(key)
×
508
                        case string:
×
509
                                if handler != nil {
×
510
                                        if lldpIf, ok := conn.cachedLLDPIfs[key]; ok {
×
511
                                                requeue = handler(key, lldpIf)
×
512
                                        }
×
513
                                }
514
                        }
515
                        if requeue {
×
516
                                queue.AddRateLimited(key)
×
517
                        } else {
×
518
                                queue.Forget(key)
×
519
                        }
×
520
                        queue.Done(key)
×
521

522
                }
523
        }, time.Second, stopCh)
524
        <-stopCh
1✔
525
        queue.ShutDown()
1✔
526
}
527

528
type fullSync struct{}
529

530
func (conn *ApicConnection) UnsubscribeImmediateDnLocked(dn string,
531
        targetClasses []string) {
×
532
        // Post local delete, subscription will not be refreshed
×
533
        // after refresh time-out
×
534
        delete(conn.subscriptions.subs, dn)
×
535
}
×
536

537
func (conn *ApicConnection) AddImmediateSubscriptionDnLocked(dn string,
538
        targetClasses []string, updateHook ApicObjectHandler, deleteHook ApicDnHandler) bool {
×
539
        conn.logger.WithFields(logrus.Fields{
×
540
                "mod": "APICAPI",
×
541
                "dn":  dn,
×
542
        }).Debug("Adding Subscription for Dn")
×
543

×
544
        conn.subscriptions.subs[dn] = &subscription{
×
545
                kind:          apicSubDn,
×
546
                childSubs:     make(map[string]subComponent),
×
547
                targetClasses: targetClasses,
×
548
                respClasses:   computeRespClasses(targetClasses),
×
549
        }
×
550
        if updateHook != nil {
×
551
                conn.subscriptions.subs[dn].updateHook = updateHook
×
552
        }
×
553
        if deleteHook != nil {
×
554
                conn.subscriptions.subs[dn].deleteHook = deleteHook
×
555
        }
×
556
        if conn.connection != nil {
×
557
                return conn.subscribe(dn, conn.subscriptions.subs[dn], true)
×
558
        } else {
×
559
                return false
×
560
        }
×
561
}
562

563
func (conn *ApicConnection) runConn(stopCh <-chan struct{}) {
1✔
564
        done := make(chan struct{})
1✔
565
        restart := make(chan struct{})
1✔
566
        queueStop := make(chan struct{})
1✔
567
        odevQueueStop := make(chan struct{})
1✔
568
        priorityQueueStop := make(chan struct{})
1✔
569
        syncHook := make(chan fullSync, 1)
1✔
570
        conn.restartCh = restart
1✔
571

1✔
572
        go func() {
2✔
573
                defer conn.connection.Close()
1✔
574
                defer close(done)
1✔
575

1✔
576
                for {
2✔
577
                        var apicresp ApicResponse
1✔
578
                        err := conn.connection.ReadJSON(&apicresp)
1✔
579
                        var closeErr *websocket.CloseError
1✔
580
                        if errors.As(err, &closeErr) {
2✔
581
                                conn.log.Info("Websocket connection closed: ", closeErr.Code)
1✔
582
                                conn.restart()
1✔
583
                                break
1✔
584
                        } else if err != nil {
1✔
585
                                conn.log.Error("Could not read web socket message:", err)
×
586
                                conn.restart()
×
587
                                break
×
588
                        } else {
1✔
589
                                conn.handleSocketUpdate(&apicresp)
1✔
590
                        }
1✔
591
                }
592
        }()
593

594
        if conn.VMMLiteSyncHook != nil {
1✔
595
                conn.VMMLiteSyncHook()
×
596
        }
×
597
        conn.indexMutex.Lock()
1✔
598
        oldState := conn.cacheDnSubIds
1✔
599
        conn.cachedState = make(map[string]ApicSlice)
1✔
600
        conn.cacheDnSubIds = make(map[string]map[string]bool)
1✔
601
        conn.deltaQueue = workqueue.NewNamedRateLimitingQueue(
1✔
602
                workqueue.NewMaxOfRateLimiter(
1✔
603
                        workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond,
1✔
604
                                10*time.Second),
1✔
605
                        &workqueue.BucketRateLimiter{
1✔
606
                                Limiter: rate.NewLimiter(rate.Limit(10), int(30)),
1✔
607
                        },
1✔
608
                ),
1✔
609
                "delta")
1✔
610
        go conn.processQueue(conn.deltaQueue, queueStop, "delta")
1✔
611
        conn.odevQueue = workqueue.NewNamedRateLimitingQueue(
1✔
612
                workqueue.NewMaxOfRateLimiter(
1✔
613
                        workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond,
1✔
614
                                10*time.Second),
1✔
615
                        &workqueue.BucketRateLimiter{
1✔
616
                                Limiter: rate.NewLimiter(rate.Limit(10), int(30)),
1✔
617
                        },
1✔
618
                ),
1✔
619
                "odev")
1✔
620
        go conn.processQueue(conn.odevQueue, odevQueueStop, "odev")
1✔
621
        conn.priorityQueue = workqueue.NewNamedRateLimitingQueue(
1✔
622
                workqueue.NewMaxOfRateLimiter(
1✔
623
                        workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond,
1✔
624
                                10*time.Second),
1✔
625
                        &workqueue.BucketRateLimiter{
1✔
626
                                Limiter: rate.NewLimiter(rate.Limit(10), int(30)),
1✔
627
                        },
1✔
628
                ),
1✔
629
                "priority")
1✔
630
        go conn.processQueue(conn.priorityQueue, priorityQueueStop, "priority")
1✔
631
        conn.lldpIfQueue = workqueue.NewNamedRateLimitingQueue(
1✔
632
                workqueue.NewMaxOfRateLimiter(
1✔
633
                        workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond,
1✔
634
                                10*time.Second),
1✔
635
                        &workqueue.BucketRateLimiter{
1✔
636
                                Limiter: rate.NewLimiter(rate.Limit(10), int(30)),
1✔
637
                        },
1✔
638
                ),
1✔
639
                "lldpIf")
1✔
640
        go conn.processLLDPIfQueue(conn.lldpIfQueue,
1✔
641
                conn.lldpIfHldr, stopCh)
1✔
642
        conn.indexMutex.Unlock()
1✔
643

1✔
644
        refreshInterval := conn.RefreshInterval
1✔
645
        if refreshInterval == 0 {
1✔
646
                refreshInterval = defaultConnectionRefresh
×
647
        }
×
648
        // Adjust refreshTickerInterval.
649
        // To refresh the subscriptions early than actual refresh timeout value
650
        refreshTickerInterval := refreshInterval - conn.RefreshTickerAdjust
1✔
651
        refreshTicker := time.NewTicker(refreshTickerInterval)
1✔
652
        defer refreshTicker.Stop()
1✔
653

1✔
654
        leafRebootCheckTicker := time.NewTicker(conn.LeafRebootCheckInterval)
1✔
655
        if conn.cnoEnabled {
1✔
656
                leafRebootCheckTicker.Stop()
×
657
        } else {
1✔
658
                conn.initializeLeafDnCache()
1✔
659
                defer leafRebootCheckTicker.Stop()
1✔
660
        }
1✔
661
        var hasErr bool
1✔
662
        for value, object := range conn.syncStore {
1✔
663
                if !(conn.getSyncObject(value, object)) {
×
664
                        hasErr = true
×
665
                        conn.restart()
×
666
                        break
×
667
                }
668
        }
669
        if !hasErr {
2✔
670
                for value, subscription := range conn.subscriptions.subs {
2✔
671
                        if !(conn.subscribe(value, subscription, false)) {
1✔
672
                                hasErr = true
×
673
                                conn.restart()
×
674
                                break
×
675
                        }
676
                }
677
        }
678
        if !hasErr {
2✔
679
                conn.checkDeletes(oldState)
1✔
680
                go func() {
2✔
681
                        if conn.FullSyncHook != nil {
1✔
682
                                conn.FullSyncHook()
×
683
                        }
×
684
                        syncHook <- fullSync{}
1✔
685
                }()
686
        }
687

688
        // Get APIC version if connection restarts
689
        if conn.version == "" && conn.checkVersion {
1✔
690
                go func() {
×
691
                        version, err := conn.GetVersion()
×
692
                        if err != nil {
×
693
                                conn.log.Error("Error while getting APIC version: ", err, " Restarting connection...")
×
694
                                conn.restart()
×
695
                        } else {
×
696
                                conn.log.Debug("Cached version:", conn.CachedVersion, " New version:", version)
×
697
                                if ApicVersion != version {
×
698
                                        ApicVersion = version
×
699
                                        if ApicVersion >= "6.0(4c)" {
×
700
                                                metadata["fvBD"].attributes["serviceBdRoutingDisable"] = "no"
×
701
                                        } else {
×
702
                                                delete(metadata["fvBD"].attributes, "serviceBdRoutingDisable")
×
703
                                        }
×
704
                                        if conn.VersionUpdateHook != nil {
×
705
                                                conn.VersionUpdateHook()
×
706
                                        }
×
707
                                }
708
                                conn.CachedVersion = version
×
709
                        }
710
                }()
711
        }
712

713
        closeConn := func(stop bool) {
2✔
714
                close(queueStop)
1✔
715
                close(odevQueueStop)
1✔
716

1✔
717
                conn.indexMutex.Lock()
1✔
718
                conn.deltaQueue = nil
1✔
719
                conn.odevQueue = nil
1✔
720
                conn.priorityQueue = nil
1✔
721
                conn.lldpIfQueue = nil
1✔
722
                conn.stopped = stop
1✔
723
                conn.syncEnabled = false
1✔
724
                conn.subscriptions.ids = make(map[string]string)
1✔
725
                conn.version = ""
1✔
726
                conn.indexMutex.Unlock()
1✔
727

1✔
728
                conn.log.Debug("Shutting down web socket")
1✔
729
                err := conn.connection.WriteMessage(websocket.CloseMessage,
1✔
730
                        websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
1✔
731
                if err != nil {
1✔
732
                        conn.log.Error("Error while closing socket: ", err)
×
733
                } else {
1✔
734
                        select {
1✔
735
                        case <-done:
1✔
736
                        case <-time.After(time.Second):
×
737
                        }
738
                }
739
                conn.connection.Close()
1✔
740
        }
741

742
loop:
1✔
743
        for {
2✔
744
                select {
1✔
745
                case <-syncHook:
1✔
746
                        conn.fullSync()
1✔
747
                case <-refreshTicker.C:
1✔
748
                        conn.refresh()
1✔
749
                case <-leafRebootCheckTicker.C:
×
750
                        conn.checkLeafReboot()
×
751
                case <-restart:
1✔
752
                        closeConn(false)
1✔
753
                        break loop
1✔
754
                case <-stopCh:
1✔
755
                        closeConn(true)
1✔
756
                        break loop
1✔
757
                }
758
        }
759

760
        conn.log.Debug("Exiting websocket handler")
1✔
761
}
762

763
// This function should only to be called before we make the first connection to APIC.
764
// Use the cached Apic version when determining the version elsewhere. This can lead to inconsistent tokens.
765
func (conn *ApicConnection) GetVersion() (string, error) {
1✔
766
        versionMo := "firmwareCtrlrRunning"
1✔
767

1✔
768
        if len(conn.Apic) == 0 {
1✔
769
                return "", errors.New("No APIC configuration")
×
770
        }
×
771

772
        conn.checkVersion = true // enable version check on websocket reconnect
1✔
773
        // To Handle unit-tests
1✔
774
        if strings.Contains(conn.Apic[conn.ApicIndex], "127.0.0.1") {
2✔
775
                conn.version = "4.2(4i)"
1✔
776
                conn.SnatPbrFltrChain = true
1✔
777
                conn.log.Debug("Returning APIC version 4.2(4i) for test server")
1✔
778
                return conn.version, nil
1✔
779
        }
1✔
780

781
        uri := fmt.Sprintf("/api/node/class/%s.json?&", versionMo)
×
782

×
783
        retries := 0
×
784
        for conn.version == "" {
×
785
                if retries <= conn.ReconnectRetryLimit {
×
786
                        // Wait before Retry.
×
787
                        time.Sleep(conn.ReconnectInterval)
×
788
                        retries++
×
789
                } else {
×
790
                        return "", fmt.Errorf("Failed to get APIC version after %d retries", retries)
×
791
                }
×
792

793
                token, err := conn.login()
×
794
                if err != nil {
×
795
                        conn.log.Error("Failed to log into APIC: ", err)
×
796
                        continue
×
797
                }
798
                conn.token = token
×
799

×
800
                resp, err := conn.sendHTTPSRequestToAPIC("GET", uri, nil, "", false, nil)
×
801
                if err != nil {
×
802
                        continue
×
803
                }
804
                defer complete(resp)
×
805

×
806
                var apicresp ApicResponse
×
807
                err = json.NewDecoder(resp.Body).Decode(&apicresp)
×
808
                if err != nil {
×
809
                        conn.log.Error("Could not parse APIC response: ", err)
×
810
                        continue
×
811
                }
812
                for _, obj := range apicresp.Imdata {
×
813
                        vresp := obj["firmwareCtrlrRunning"]
×
814
                        version, ok := vresp.Attributes["version"]
×
815
                        if !ok {
×
816
                                conn.log.Debug("No version attribute in the response??!")
×
817
                                conn.logger.WithFields(logrus.Fields{
×
818
                                        "mod":                            "APICAPI",
×
819
                                        "firmwareCtrlrRunning":           vresp,
×
820
                                        "firmwareCtrlRunning Attributes": vresp.Attributes,
×
821
                                }).Debug("Response:")
×
822
                        } else {
×
823
                                switch version := version.(type) {
×
824
                                default:
×
825
                                case string:
×
826
                                        version_split := strings.Split(version, "(")
×
827
                                        version_number, err := strconv.ParseFloat(version_split[0], 64)
×
828
                                        conn.log.Info("Actual APIC version:", version, " Stripped out version:", version_number)
×
829
                                        if err == nil {
×
830
                                                conn.version = version //return the actual version
×
831
                                        }
×
832
                                }
833
                        }
834
                }
835
        }
836
        return conn.version, nil
×
837
}
838

839
func (conn *ApicConnection) Run(stopCh <-chan struct{}) {
1✔
840
        if len(conn.Apic) == 0 {
1✔
841
                conn.log.Warning("APIC connection not configured")
×
842
                return
×
843
        }
×
844

845
        if conn.version >= "6.0(4c)" {
1✔
846
                metadata["fvBD"].attributes["serviceBdRoutingDisable"] = "no"
×
847
        }
×
848

849
        for !conn.stopped {
2✔
850
                func() {
2✔
851
                        defer func() {
2✔
852
                                conn.ApicIndex = (conn.ApicIndex + 1) % len(conn.Apic)
1✔
853
                                time.Sleep(conn.ReconnectInterval)
1✔
854
                        }()
1✔
855

856
                        conn.logger.WithFields(logrus.Fields{
1✔
857
                                "mod":  "APICAPI",
1✔
858
                                "host": conn.Apic[conn.ApicIndex],
1✔
859
                        }).Info("Connecting to APIC")
1✔
860

1✔
861
                        for dn := range conn.subscriptions.subs {
2✔
862
                                conn.subscriptions.subs[dn].childSubs = make(map[string]subComponent)
1✔
863
                        }
1✔
864
                        conn.subscriptions.ids = make(map[string]string)
1✔
865

1✔
866
                        token, err := conn.login()
1✔
867
                        if err != nil {
2✔
868
                                conn.log.Error("Failed to log into APIC: ", err)
1✔
869
                                return
1✔
870
                        }
1✔
871
                        conn.token = token
1✔
872

1✔
873
                        uri := fmt.Sprintf("/socket%s", token)
1✔
874
                        url := fmt.Sprintf("wss://%s%s",
1✔
875
                                conn.Apic[conn.ApicIndex], uri)
1✔
876
                        header := make(http.Header)
1✔
877
                        if conn.signer != nil {
2✔
878
                                sig, err := conn.signer.sign("GET", uri, nil)
1✔
879
                                if err != nil {
1✔
880
                                        conn.log.Error("Failed to sign request: ", err)
×
881
                                        return
×
882
                                }
×
883
                                header.Set("Cookie", conn.apicSigCookie(sig, token))
1✔
884
                        }
885

886
                        conn.connection, _, err = conn.dialer.Dial(url, header)
1✔
887
                        if err != nil {
1✔
888
                                conn.log.Error("Failed to open APIC websocket: ", err)
×
889
                                return
×
890
                        }
×
891
                        conn.log.Info("Websocket connected!")
1✔
892
                        conn.runConn(stopCh)
1✔
893
                }()
894
        }
895
}
896

897
func (conn *ApicConnection) refresh() {
1✔
898
        if conn.signer == nil {
2✔
899
                uri := "/api/aaaRefresh.json"
1✔
900
                resp, err := conn.sendHTTPSRequestToAPIC("GET", uri, nil, "", true, nil)
1✔
901
                if err != nil {
2✔
902
                        return
1✔
903
                }
1✔
904
                complete(resp)
1✔
905
                conn.log.Debugf("Refresh: url %v", resp.Request.URL)
1✔
906
        }
907

908
        for _, sub := range conn.subscriptions.subs {
2✔
909
                refreshId := func(id string) {
2✔
910
                        uri := fmt.Sprintf("/api/subscriptionRefresh.json?id=%s", id)
1✔
911
                        resp, err := conn.sendHTTPSRequestToAPIC("GET", uri, nil, "", true, nil)
1✔
912
                        if err != nil {
2✔
913
                                return
1✔
914
                        }
1✔
915
                        complete(resp)
×
916
                        conn.log.Debugf("Refresh sub: url %v", resp.Request.URL)
×
917
                        time.Sleep(conn.SubscriptionDelay)
×
918
                }
919
                if len(sub.childSubs) > 0 {
1✔
920
                        for id := range sub.childSubs {
×
921
                                refreshId(id)
×
922
                        }
×
923
                } else {
1✔
924
                        refreshId(sub.id)
1✔
925
                }
1✔
926
        }
927
}
928

929
func (conn *ApicConnection) getLeafDns() ApicSlice {
1✔
930
        uri := "/api/node/class/topSystem.json?query-target-filter=and(eq(topSystem.role,\"leaf\"))"
1✔
931
        apicresp, err := conn.GetApicResponse(uri)
1✔
932
        if err != nil {
2✔
933
                conn.log.Errorf("Unable to get leaf details. Error while getting apic response: %v", err)
1✔
934
                return nil
1✔
935
        }
1✔
936
        if len(apicresp.Imdata) == 0 {
×
937
                conn.log.Warnf("Zero leaf details found in apic response")
×
938
        }
×
939
        return apicresp.Imdata
×
940
}
941

942
func (conn *ApicConnection) initializeLeafDnCache() {
1✔
943
        imData := conn.getLeafDns()
1✔
944
        for _, obj := range imData {
1✔
945
                leafDn := obj.GetDn()
×
946
                lastRebootTimeStr := obj.GetAttrStr("lastRebootTime")
×
947
                lastRebootTime, err := time.Parse(time.RFC3339, lastRebootTimeStr)
×
948
                if err != nil {
×
949
                        conn.log.Warnf("Skipping leaf %v with unparsable lastRebootTime: %v", leafDn, err)
×
950
                        continue
×
951
                }
952
                conn.log.Debugf("Initializing cache with leaf %v with lastRebootTime: %v", leafDn, lastRebootTime)
×
953
                conn.cachedLeafDns[leafDn] = &leafCacheEntry{
×
954
                        lastRebootTime: lastRebootTime,
×
955
                        lastSeenTime:   time.Now(),
×
956
                }
×
957
        }
958
}
959

960
func (conn *ApicConnection) checkLeafReboot() {
×
961
        reboot := false
×
962
        imData := conn.getLeafDns()
×
963
        for _, obj := range imData {
×
964
                leafDn := obj.GetDn()
×
965
                lastRebootTimeStr := obj.GetAttrStr("lastRebootTime")
×
966
                lastRebootTime, err := time.Parse(time.RFC3339, lastRebootTimeStr)
×
967
                if err != nil {
×
968
                        conn.log.Warnf("Skipping leaf %v with unparsable lastRebootTime: %v", leafDn, err)
×
969
                        continue
×
970
                }
971
                if cacheEntry, present := conn.cachedLeafDns[leafDn]; present {
×
972
                        if !cacheEntry.lastRebootTime.Equal(lastRebootTime) {
×
973
                                conn.log.Debugf("Detected reboot of leaf %v. Current lastRebootTime:%v, cached lastRebootTime:%v", leafDn, lastRebootTime, cacheEntry.lastRebootTime)
×
974
                                conn.cachedLeafDns[leafDn].lastRebootTime = lastRebootTime
×
975
                                reboot = true
×
976
                        }
×
977
                } else {
×
978
                        conn.log.Debugf("Detected new leaf %v. Current lastRebootTime:%v", leafDn, lastRebootTime)
×
979
                        conn.cachedLeafDns[leafDn] = &leafCacheEntry{
×
980
                                lastRebootTime: lastRebootTime,
×
981
                        }
×
982
                        reboot = true
×
983
                }
×
984
                conn.cachedLeafDns[leafDn].lastSeenTime = time.Now()
×
985
        }
986

987
        for leafDn, cacheEntry := range conn.cachedLeafDns {
×
988
                if time.Since(cacheEntry.lastSeenTime) > cacheStaleAfterTime {
×
989
                        delete(conn.cachedLeafDns, leafDn)
×
990
                        conn.log.Debugf("Removing stale leaf cache entry: %v (last seen %v ago)", leafDn, time.Since(cacheEntry.lastSeenTime))
×
991
                }
×
992
        }
993
        if reboot {
×
994
                conn.log.Infof("Restarting APIC connection because of leaf reboot")
×
995
                conn.restart()
×
996
        }
×
997
}
998

999
// sendHTTPSRequestToAPIC sends an HTTPS request to APIC.
1000
// - Retries on 503 when EnableRequestRetry is true, up to maxRequestRetry times, using exponential backoff with jitter.
1001
// - If restart is true, the connection is restarted on request/transport errors, non-2xx responses, and after retry exhaustion.
1002
// - Status codes in exceptionStatusCode are returned as-is for the caller to handle.
1003
// Sets the Content-Type header when provided; pass nil for body and "" for contentType to omit them.
1004
// Returns a non-nil error for non-2xx responses (except those listed in exceptionStatusCode).
1005
func (conn *ApicConnection) sendHTTPSRequestToAPIC(method string, uri string, body []byte, contentType string,
1006
        restart bool, exceptionStatusCode []int) (*http.Response, error) {
1✔
1007
        url := fmt.Sprintf("https://%s%s", conn.Apic[conn.ApicIndex], uri)
1✔
1008
        retry := 0
1✔
1009

1✔
1010
        for {
2✔
1011
                // Construct new request for each retry.
1✔
1012
                reqBody := bytes.NewReader(body)
1✔
1013
                req, err := http.NewRequest(method, url, reqBody)
1✔
1014
                if err != nil {
1✔
1015
                        conn.log.Error("Could not create request:", err)
×
1016
                        if restart {
×
1017
                                conn.restart()
×
1018
                        }
×
1019
                        return nil, err
×
1020
                }
1021
                if contentType != "" {
2✔
1022
                        req.Header.Set("Content-Type", contentType)
1✔
1023
                }
1✔
1024
                conn.sign(req, uri, body)
1✔
1025

1✔
1026
                if retry > 0 {
1✔
1027
                        conn.log.Infof("Retrying request : %s, Attempt %d ", req.URL.String(), retry)
×
1028
                }
×
1029

1030
                resp, err := conn.client.Do(req)
1✔
1031
                if err != nil {
2✔
1032
                        conn.log.Error("Failed to send request to APIC: ", err)
1✔
1033
                        if restart {
2✔
1034
                                conn.restart()
1✔
1035
                        }
1✔
1036
                        return nil, err
1✔
1037
                }
1038

1039
                // If exceptionStatusCode is passed by caller function,
1040
                // the caller is responsible for handling those status codes.
1041
                for _, exception := range exceptionStatusCode {
2✔
1042
                        if exception == resp.StatusCode {
1✔
1043
                                return resp, nil
×
1044
                        }
×
1045
                }
1046

1047
                // Retry on 503 Service Unavailable
1048
                if conn.EnableRequestRetry && resp.StatusCode == 503 {
1✔
1049
                        conn.log.Error("Recieved Service Unavailable Response for url : ", req.URL.String())
×
1050
                        retry++
×
1051
                        if retry > maxRequestRetry {
×
1052
                                conn.log.Errorf("Maximum retry for %s exceeded", req.URL.String())
×
1053
                                complete(resp)
×
1054
                                if restart {
×
1055
                                        conn.restart()
×
1056
                                }
×
1057
                                return resp, fmt.Errorf("Got error response from APIC")
×
1058
                        }
1059
                        complete(resp)
×
1060
                        // Exponential backoff with jitter
×
1061
                        // With default minBase=20s (configurable via RequestRetryDelayBase):
×
1062
                        //   retry intervals: [20s, 40s], [40s, 60s], [80s, 100s], [160s, 180s], [320s, 340s]
×
1063
                        // Intervals are strictly non-overlapping (each min >= previous max).
×
1064
                        // For all retries to complete - min time: 620s ~ 10 min 20 sec, max time: 720s = 12 min.
×
1065
                        minBase := time.Duration(conn.RequestRetryDelayBase) * time.Second
×
1066
                        exp := time.Duration(1 << uint(retry-1)) // 2^(retry-1): 1, 2, 4, 8, 16
×
1067
                        base := minBase * exp
×
1068
                        jitter := time.Duration(rand.Int63n(int64(20 * time.Second)))
×
1069
                        backoffDuration := base + jitter
×
1070
                        conn.log.Debugf("Waiting %v before retry attempt %d", backoffDuration, retry)
×
1071
                        time.Sleep(backoffDuration)
×
1072
                        continue
×
1073
                }
1074

1075
                // Handle non-success status codes
1076
                if resp.StatusCode < 200 || resp.StatusCode >= 300 {
2✔
1077
                        conn.logErrorResp("Error response from APIC ", resp)
1✔
1078
                        complete(resp)
1✔
1079
                        if restart {
2✔
1080
                                conn.restart()
1✔
1081
                        }
1✔
1082
                        return resp, fmt.Errorf("Got error response from APIC")
1✔
1083
                }
1084

1085
                //success case
1086
                return resp, nil
1✔
1087
        }
1088
}
1089

1090
func (conn *ApicConnection) logErrorResp(message string, resp *http.Response) {
1✔
1091
        var apicresp ApicResponse
1✔
1092
        err := json.NewDecoder(resp.Body).Decode(&apicresp)
1✔
1093
        if err != nil {
2✔
1094
                conn.log.Error("Could not parse APIC error response: ", err)
1✔
1095
        } else {
2✔
1096
                code := 0
1✔
1097
                text := ""
1✔
1098
                for _, o := range apicresp.Imdata {
2✔
1099
                        if ob, ok := o["error"]; ok {
2✔
1100
                                if ob.Attributes != nil {
2✔
1101
                                        if t, isStr := ob.Attributes["text"].(string); isStr {
2✔
1102
                                                text = t
1✔
1103
                                        }
1✔
1104
                                        if c, isInt := ob.Attributes["code"].(int); isInt {
1✔
1105
                                                code = c
×
1106
                                        }
×
1107
                                }
1108
                        }
1109
                }
1110
                conn.logger.WithFields(logrus.Fields{
1✔
1111
                        "mod":    "APICAPI",
1✔
1112
                        "text":   text,
1✔
1113
                        "code":   code,
1✔
1114
                        "url":    resp.Request.URL,
1✔
1115
                        "status": resp.StatusCode,
1✔
1116
                }).Error(message)
1✔
1117
        }
1118
}
1119

1120
// To make sure cluster's POD/NodeBDs and L3OUT are all mapped
1121
// to same and correct VRF.
1122
func (conn *ApicConnection) ValidateAciVrfAssociation(acivrfdn string, expectedVrfRelations []string) error {
×
1123
        var aciVrfBdL3OuttDns []string
×
1124
        args := []string{
×
1125
                "query-target=subtree",
×
1126
                "target-subtree-class=fvRtCtx,fvRtEctx",
×
1127
        }
×
1128

×
1129
        uri := fmt.Sprintf("/api/mo/%s.json?%s", acivrfdn, strings.Join(args, "&"))
×
1130
        resp, err := conn.sendHTTPSRequestToAPIC("GET", uri, nil, "", true, nil)
×
1131
        if err != nil {
×
1132
                return err
×
1133
        }
×
1134
        defer complete(resp)
×
1135

×
1136
        var apicresp ApicResponse
×
1137
        err = json.NewDecoder(resp.Body).Decode(&apicresp)
×
1138
        if err != nil {
×
1139
                conn.log.Error("Could not parse APIC response: ", err)
×
1140
                return err
×
1141
        }
×
1142

1143
        for _, obj := range apicresp.Imdata {
×
1144
                for _, body := range obj {
×
1145
                        tDn, ok := body.Attributes["tDn"].(string)
×
1146
                        if !ok {
×
1147
                                continue
×
1148
                        }
1149
                        aciVrfBdL3OuttDns = append(aciVrfBdL3OuttDns, tDn)
×
1150
                }
1151
        }
1152
        sort.Strings(aciVrfBdL3OuttDns)
×
1153
        conn.log.Debug("aciVrfBdL3OuttDns:", aciVrfBdL3OuttDns)
×
1154
        for _, expectedDn := range expectedVrfRelations {
×
1155
                i := sort.SearchStrings(aciVrfBdL3OuttDns, expectedDn)
×
1156
                if !(i < len(aciVrfBdL3OuttDns) && aciVrfBdL3OuttDns[i] == expectedDn) {
×
1157
                        conn.log.Debug("Missing (or) Incorrect Vrf association: ", expectedDn)
×
1158
                        return errors.New("Incorrect Pod/NodeBD/L3OUT VRF association")
×
1159
                }
×
1160
        }
1161
        return nil
×
1162
}
1163

1164
func (conn *ApicConnection) getSubtreeDn(dn string, respClasses []string,
1165
        updateHandlers []ApicObjectHandler) {
1✔
1166
        // If there are no update handlers, there is no point doing anything till sync is enabled as reconcileApicObject just returns
1✔
1167
        if len(updateHandlers) == 0 {
2✔
1168
                conn.indexMutex.Lock()
1✔
1169
                if !conn.syncEnabled {
1✔
1170
                        conn.indexMutex.Unlock()
×
1171
                        return
×
1172
                }
×
1173
                conn.indexMutex.Unlock()
1✔
1174
        }
1175
        args := []string{
1✔
1176
                "rsp-subtree=full",
1✔
1177
        }
1✔
1178

1✔
1179
        if len(respClasses) > 0 {
2✔
1180
                args = append(args, "rsp-subtree-class="+strings.Join(respClasses, ","))
1✔
1181
        }
1✔
1182
        // properly encoding the URI query parameters breaks APIC
1183
        uri := fmt.Sprintf("/api/mo/%s.json?%s", dn, strings.Join(args, "&"))
1✔
1184
        conn.log.Debugf("URL: %v", uri)
1✔
1185
        resp, err := conn.sendHTTPSRequestToAPIC("GET", uri, nil, "", true, nil)
1✔
1186
        if err != nil {
1✔
1187
                return
×
1188
        }
×
1189
        defer complete(resp)
1✔
1190

1✔
1191
        var apicresp ApicResponse
1✔
1192
        err = json.NewDecoder(resp.Body).Decode(&apicresp)
1✔
1193
        if err != nil {
1✔
1194
                conn.log.Error("Could not parse APIC response: ", err)
×
1195
                return
×
1196
        }
×
1197
        if len(apicresp.Imdata) == 0 {
1✔
1198
                conn.log.Debugf("No subtree found for dn %s", dn)
×
1199
        }
×
1200

1201
        for _, obj := range apicresp.Imdata {
2✔
1202
                conn.logger.WithFields(logrus.Fields{
1✔
1203
                        "mod": "APICAPI",
1✔
1204
                        "dn":  obj.GetDn(),
1✔
1205
                        "obj": obj,
1✔
1206
                }).Debug("Object updated on APIC")
1✔
1207
                var count int
1✔
1208
                prepareApicCache("", obj, &count)
1✔
1209

1✔
1210
                handled := false
1✔
1211
                for _, handler := range updateHandlers {
1✔
1212
                        if handler(obj) {
×
1213
                                handled = true
×
1214
                                break
×
1215
                        }
1216
                }
1217
                if handled {
1✔
1218
                        continue
×
1219
                }
1220
                conn.reconcileApicObject(obj)
1✔
1221
        }
1222
}
1223

1224
func (conn *ApicConnection) queuePriorityDn(dn string) {
×
1225
        conn.indexMutex.Lock()
×
1226
        if conn.priorityQueue != nil {
×
1227
                conn.addToQueue(conn.priorityQueue, dn)
×
1228
        }
×
1229
        conn.indexMutex.Unlock()
×
1230
}
1231

1232
func (conn *ApicConnection) queueDn(dn string) {
1✔
1233
        conn.indexMutex.Lock()
1✔
1234
        if conn.deltaQueue != nil {
2✔
1235
                conn.addToQueue(conn.deltaQueue, dn)
1✔
1236
        }
1✔
1237
        conn.indexMutex.Unlock()
1✔
1238
}
1239

1240
func (conn *ApicConnection) ForceRelogin() {
×
1241
        conn.token = ""
×
1242
}
×
1243

1244
/*
1245
// Commented out - function no longer in use
1246
func (conn *ApicConnection) PostDnInline(dn string, obj ApicObject) error {
1247
        conn.logger.WithFields(logrus.Fields{
1248
                "mod": "APICAPI",
1249
                "dn":  dn,
1250
                "obj": obj,
1251
        }).Debug("Posting Dn Inline")
1252
        if conn.token == "" {
1253
                token, err := conn.login()
1254
                if err != nil {
1255
                        conn.log.Errorf("Login: %v", err)
1256
                        return err
1257
                }
1258
                conn.token = token
1259
        }
1260
        uri := fmt.Sprintf("/api/mo/%s.json", dn)
1261
        url := fmt.Sprintf("https://%s%s", conn.Apic[conn.ApicIndex], uri)
1262
        raw, err := json.Marshal(obj)
1263
        if err != nil {
1264
                conn.log.Error("Could not serialize object for dn ", dn, ": ", err)
1265
                return err
1266
        }
1267
        req, err := http.NewRequest("POST", url, bytes.NewBuffer(raw))
1268
        if err != nil {
1269
                conn.log.Error("Could not create request: ", err)
1270
                return err
1271
        }
1272
        conn.sign(req, uri, raw)
1273
        req.Header.Set("Content-Type", "application/json")
1274
        conn.log.Infof("Post: %+v", req)
1275
        resp, err := conn.client.Do(req)
1276
        if err != nil {
1277
                conn.log.Error("Could not update dn ", dn, ": ", err)
1278
                return err
1279
        }
1280

1281
        complete(resp)
1282
        if resp.StatusCode != http.StatusOK {
1283
                return fmt.Errorf("Status: %v", resp.StatusCode)
1284
        }
1285
        return nil
1286
}
1287

1288
// Commented out - function no longer in use
1289
func (conn *ApicConnection) DeleteDnInline(dn string) error {
1290
        conn.logger.WithFields(logrus.Fields{
1291
                "mod": "APICAPI",
1292
                "dn":  dn,
1293
        }).Debug("Deleting Dn Inline")
1294
        uri := fmt.Sprintf("/api/mo/%s.json", dn)
1295
        url := fmt.Sprintf("https://%s%s", conn.Apic[conn.ApicIndex], uri)
1296
        req, err := http.NewRequest("DELETE", url, http.NoBody)
1297
        if err != nil {
1298
                conn.log.Error("Could not create delete request: ", err)
1299
                return err
1300
        }
1301
        conn.sign(req, uri, nil)
1302
        resp, err := conn.client.Do(req)
1303
        if err != nil {
1304
                conn.log.Error("Could not delete dn ", dn, ": ", err)
1305
                return err
1306
        }
1307
        // TODO: handle resp statusCode
1308
        defer complete(resp)
1309
        return nil
1310
}
1311
*/
1312

1313
func (conn *ApicConnection) postDn(dn string, obj ApicObject) bool {
1✔
1314
        conn.logger.WithFields(logrus.Fields{
1✔
1315
                "mod": "APICAPI",
1✔
1316
                "dn":  dn,
1✔
1317
                "obj": obj,
1✔
1318
        }).Debug("Posting Dn")
1✔
1319

1✔
1320
        uri := fmt.Sprintf("/api/mo/%s.json", dn)
1✔
1321
        raw, err := json.Marshal(obj)
1✔
1322
        if err != nil {
1✔
1323
                conn.log.Error("Could not serialize object for dn ", dn, ": ", err)
×
1324
        }
×
1325
        resp, err := conn.sendHTTPSRequestToAPIC("POST", uri, raw, "application/json", true, []int{400})
1✔
1326
        if err != nil {
1✔
1327
                return false
×
1328
        }
×
1329
        defer complete(resp)
1✔
1330
        if resp.StatusCode == 400 {
1✔
1331
                return true
×
1332
        }
×
1333
        return false
1✔
1334
}
1335

1336
func (conn *ApicConnection) Delete(dn string) bool {
1✔
1337
        if dn == "" {
1✔
1338
                conn.log.Debug("Skip delete for empty Dn: ")
×
1339
                return false
×
1340
        }
×
1341
        dnSlice := strings.Split(dn, "/")
1✔
1342
        identifier := dnSlice[len(dnSlice)-1]
1✔
1343
        iSlice := strings.SplitN(identifier, "-", 2)
1✔
1344
        if len(iSlice) == 2 {
2✔
1345
                if iSlice[0] == "ip" {
1✔
1346
                        addr := strings.Trim(iSlice[1], "[]")
×
1347
                        obj := NewDeleteHostprotRemoteIp(addr)
×
1348
                        conn.log.Debug("Posting delete of dn ", dn)
×
1349
                        return conn.postDn(dn, obj)
×
1350
                } else if iSlice[0] == "odev" {
1✔
1351
                        conn.log.Debug("Skipping delete of opflexODev : ", dn)
×
1352
                        return false
×
1353
                }
×
1354
        }
1355
        return conn.DeleteDn(dn)
1✔
1356
}
1357

1358
func (conn *ApicConnection) DeleteDn(dn string) bool {
1✔
1359
        if dn == "" {
1✔
1360
                conn.log.Debug("Skip delete for empty Dn: ")
×
1361
                return false
×
1362
        }
×
1363
        conn.logger.WithFields(logrus.Fields{
1✔
1364
                "mod": "APICAPI",
1✔
1365
                "dn":  dn,
1✔
1366
        }).Debug("Deleting Dn")
1✔
1367
        uri := fmt.Sprintf("/api/mo/%s.json", dn)
1✔
1368
        resp, err := conn.sendHTTPSRequestToAPIC("DELETE", uri, nil, "", true, nil)
1✔
1369
        if err != nil {
1✔
1370
                return false
×
1371
        }
×
1372
        defer complete(resp)
1✔
1373
        return false
1✔
1374
}
1375

1376
func doComputeRespClasses(targetClasses []string,
1377
        visited map[string]bool) {
1✔
1378
        for _, class := range targetClasses {
2✔
1379
                if visited[class] {
1✔
1380
                        continue
×
1381
                }
1382
                visited[class] = true
1✔
1383
                if md, ok := metadata[class]; ok {
2✔
1384
                        doComputeRespClasses(md.children, visited)
1✔
1385
                }
1✔
1386
        }
1387
}
1388

1389
func computeRespClasses(targetClasses []string) []string {
1✔
1390
        visited := make(map[string]bool)
1✔
1391
        doComputeRespClasses(targetClasses, visited)
1✔
1392

1✔
1393
        var respClasses []string
1✔
1394
        for class := range visited {
2✔
1395
                respClasses = append(respClasses, class)
1✔
1396
        }
1✔
1397
        respClasses = append(respClasses, "tagAnnotation")
1✔
1398
        return respClasses
1✔
1399
}
1400

1401
// AddSubscriptionTree subscribe at a subtree level. class specifies
1402
// the root. Changes will cause entire subtree of the rootdn to be fetched
1403
func (conn *ApicConnection) AddSubscriptionTree(class string,
1404
        targetClasses []string, targetFilter string) {
1✔
1405
        if _, ok := classDepth[class]; !ok {
1✔
1406
                errStr := fmt.Sprintf("classDepth not for class %s", class)
×
1407
                panic(errStr)
×
1408
        }
1409

1410
        conn.indexMutex.Lock()
1✔
1411
        conn.subscriptions.subs[class] = &subscription{
1✔
1412
                kind:          apicSubTree,
1✔
1413
                childSubs:     make(map[string]subComponent),
1✔
1414
                targetClasses: targetClasses,
1✔
1415
                targetFilter:  targetFilter,
1✔
1416
        }
1✔
1417
        conn.indexMutex.Unlock()
1✔
1418
}
1419

1420
func (conn *ApicConnection) AddSubscriptionClass(class string,
1421
        targetClasses []string, targetFilter string) {
1✔
1422
        conn.indexMutex.Lock()
1✔
1423
        conn.subscriptions.subs[class] = &subscription{
1✔
1424
                kind:          apicSubClass,
1✔
1425
                childSubs:     make(map[string]subComponent),
1✔
1426
                targetClasses: targetClasses,
1✔
1427
                respClasses:   computeRespClasses(targetClasses),
1✔
1428
                targetFilter:  targetFilter,
1✔
1429
        }
1✔
1430
        conn.indexMutex.Unlock()
1✔
1431
}
1✔
1432

1433
func (conn *ApicConnection) AddSubscriptionDn(dn string,
1434
        targetClasses []string) {
1✔
1435
        conn.logger.WithFields(logrus.Fields{
1✔
1436
                "mod": "APICAPI",
1✔
1437
                "dn":  dn,
1✔
1438
        }).Debug("Adding Subscription for Dn")
1✔
1439

1✔
1440
        conn.indexMutex.Lock()
1✔
1441
        conn.subscriptions.subs[dn] = &subscription{
1✔
1442
                kind:          apicSubDn,
1✔
1443
                childSubs:     make(map[string]subComponent),
1✔
1444
                targetClasses: targetClasses,
1✔
1445
                respClasses:   computeRespClasses(targetClasses),
1✔
1446
        }
1✔
1447
        conn.indexMutex.Unlock()
1✔
1448
}
1✔
1449

1450
func (conn *ApicConnection) AddSyncDn(dn string,
1451
        targetClasses []string) {
×
1452
        conn.logger.WithFields(logrus.Fields{
×
1453
                "mod": "APICAPI",
×
1454
                "dn":  dn,
×
1455
        }).Debug("Adding Dn for syncing")
×
1456
        conn.indexMutex.Lock()
×
1457
        conn.syncStore[dn] = &syncObj{
×
1458
                kind:          apicSubDn,
×
1459
                targetClasses: targetClasses,
×
1460
                respClasses:   computeRespClasses(targetClasses),
×
1461
        }
×
1462
        conn.indexMutex.Unlock()
×
1463
}
×
1464

1465
func (conn *ApicConnection) SetSyncHooks(value string,
1466
        updateHook ApicObjectHandler, deleteHook ApicDnHandler) {
×
1467
        conn.indexMutex.Lock()
×
1468
        if s, ok := conn.subscriptions.subs[value]; ok {
×
1469
                s.updateHook = updateHook
×
1470
                s.deleteHook = deleteHook
×
1471
        }
×
1472
        conn.indexMutex.Unlock()
×
1473
}
1474

1475
func (conn *ApicConnection) SetSubscriptionHooks(value string,
1476
        updateHook ApicObjectHandler, deleteHook ApicDnHandler) {
1✔
1477
        conn.indexMutex.Lock()
1✔
1478
        if s, ok := conn.subscriptions.subs[value]; ok {
2✔
1479
                s.updateHook = updateHook
1✔
1480
                s.deleteHook = deleteHook
1✔
1481
        }
1✔
1482
        conn.indexMutex.Unlock()
1✔
1483
}
1484

1485
func (conn *ApicConnection) GetApicResponse(uri string) (ApicResponse, error) {
1✔
1486
        conn.log.Debug("apicIndex: ", conn.Apic[conn.ApicIndex], " uri: ", uri)
1✔
1487
        conn.log.Debug("Apic Get url: ", uri)
1✔
1488
        var apicresp ApicResponse
1✔
1489
        resp, err := conn.sendHTTPSRequestToAPIC("GET", uri, nil, "", false, nil)
1✔
1490
        if err != nil {
2✔
1491
                return apicresp, err
1✔
1492
        }
1✔
1493
        defer complete(resp)
1✔
1494
        err = json.NewDecoder(resp.Body).Decode(&apicresp)
1✔
1495
        if err != nil {
1✔
1496
                conn.log.Error("Could not parse APIC response: ", err)
×
1497
                return apicresp, err
×
1498
        }
×
1499
        return apicresp, nil
1✔
1500
}
1501

1502
func (conn *ApicConnection) doSubscribe(args []string,
1503
        kind, value, refresh_interval string, apicresp *ApicResponse) bool {
1✔
1504
        // properly encoding the URI query parameters breaks APIC
1✔
1505
        uri := fmt.Sprintf("/api/%s/%s.json?subscription=yes&%s%s",
1✔
1506
                kind, value, refresh_interval, strings.Join(args, "&"))
1✔
1507
        conn.log.Info("APIC connection URL: ", uri)
1✔
1508
        resp, err := conn.sendHTTPSRequestToAPIC("GET", uri, nil, "", false, nil)
1✔
1509
        if err != nil {
1✔
1510
                return false
×
1511
        }
×
1512
        defer complete(resp)
1✔
1513

1✔
1514
        err = json.NewDecoder(resp.Body).Decode(apicresp)
1✔
1515
        if err != nil {
1✔
1516
                conn.log.Error("Could not decode APIC response", err)
×
1517
                return false
×
1518
        }
×
1519
        time.Sleep(conn.SubscriptionDelay)
1✔
1520
        return true
1✔
1521
}
1522

1523
func (conn *ApicConnection) isPresentInOpflexOdevCache(dn, status string, obj ApicObject) bool {
×
1524
        if _, ok := conn.cacheOpflexOdev[dn]; ok {
×
1525
                return true
×
1526
        }
×
1527

1528
        switch status {
×
1529
        case "created":
×
1530
                return conn.updateOpflexOdevCache(obj, false, true)
×
1531
        case "deleted":
×
1532
                return conn.updateOpflexOdevCache(obj, true, true)
×
1533
        default:
×
1534
                return false
×
1535
        }
1536
}
1537

1538
func (conn *ApicConnection) updateOpflexOdevCache(obj ApicObject, remove, lockHeld bool) bool {
×
1539
        var updated bool
×
1540
        dn := obj.GetDn()
×
1541
        if !lockHeld {
×
1542
                conn.indexMutex.Lock()
×
1543
                defer conn.indexMutex.Unlock()
×
1544
        }
×
1545

1546
        if remove {
×
1547
                if _, ok := conn.cacheOpflexOdev[dn]; ok {
×
1548
                        delete(conn.cacheOpflexOdev, dn)
×
1549
                        updated = true
×
1550
                        conn.log.Info("Removed dn from opflexOdev cache: ", dn)
×
1551
                }
×
1552
        } else if strings.Contains(obj.GetDomName(), conn.VmmDomain) ||
×
1553
                (strings.Contains(conn.Flavor, "openstack") && strings.Contains(obj.GetCompHvDn(), "/prov-OpenStack/")) {
×
1554
                conn.cacheOpflexOdev[dn] = struct{}{}
×
1555
                updated = true
×
1556
                conn.log.Info("Added dn in opflexOdev cache: ", dn)
×
1557
        }
×
1558
        return updated
×
1559
}
1560

1561
func (conn *ApicConnection) computeArgSet(targetClasses, respClasses []string, targetFilter string, defaultArgs int) (splitTargetClasses, splitRespClasses, argSet [][]string, argCount int) {
1✔
1562

1✔
1563
        baseArgs := []string{
1✔
1564
                "query-target=subtree",
1✔
1565
                "rsp-subtree=full",
1✔
1566
                "target-subtree-class=" + strings.Join(targetClasses, ","),
1✔
1567
        }
1✔
1568

1✔
1569
        argCount = defaultArgs
1✔
1570
        var combinableSubClasses, separableSubClasses []string
1✔
1571
        argSet = make([][]string, defaultArgs)
1✔
1572
        argSet[defaultArgs-1] = make([]string, len(baseArgs))
1✔
1573
        copy(argSet[defaultArgs-1], baseArgs)
1✔
1574
        if respClasses != nil {
2✔
1575
                separateClasses := func(classes []string, combClasses, sepClasses *[]string) {
2✔
1576
                        for i := range classes {
2✔
1577
                                if classMeta, ok := metadata[classes[i]]; ok {
2✔
1578
                                        if classes[i] == "tagAnnotation" {
2✔
1579
                                                continue
1✔
1580
                                        }
1581
                                        if classMeta.hints != nil && classMeta.hints["cardinality"] == "high" {
2✔
1582
                                                *sepClasses = append(*sepClasses, classes[i])
1✔
1583
                                                continue
1✔
1584
                                        }
1585
                                        *combClasses = append(*combClasses, classes[i])
1✔
1586
                                }
1587
                        }
1588
                }
1589
                separateClasses(respClasses, &combinableSubClasses, &separableSubClasses)
1✔
1590

1✔
1591
                // In case there are high cardinality children, we register for all the classes individually.
1✔
1592
                // The concept of target-subtree and rsp-subtree class cannot be used because of the tagAnnotation object
1✔
1593
                // vmmInjectedLabel is added for every object, so getting it separately will not be scalable
1✔
1594
                if len(separableSubClasses) > 0 {
2✔
1595
                        separateClasses(targetClasses, &combinableSubClasses, &separableSubClasses)
1✔
1596
                        separableSubClasses = append(separableSubClasses, combinableSubClasses...)
1✔
1597
                        baseArgs = []string{
1✔
1598
                                "query-target=subtree",
1✔
1599
                                "rsp-subtree=children",
1✔
1600
                        }
1✔
1601
                        subscribingClasses := make(map[string]bool)
1✔
1602
                        argSet = make([][]string, len(separableSubClasses))
1✔
1603
                        splitTargetClasses = make([][]string, len(separableSubClasses))
1✔
1604
                        splitRespClasses = make([][]string, len(separableSubClasses))
1✔
1605

1✔
1606
                        argCount = 0
1✔
1607
                        for i := range separableSubClasses {
2✔
1608
                                // Eliminate duplicates
1✔
1609
                                if _, ok := subscribingClasses[separableSubClasses[i]]; ok {
2✔
1610
                                        continue
1✔
1611
                                }
1612
                                subscribingClasses[separableSubClasses[i]] = true
1✔
1613
                                argSet[argCount] = make([]string, len(baseArgs))
1✔
1614
                                copy(argSet[argCount], baseArgs)
1✔
1615
                                argSet[argCount] = append(argSet[argCount], "target-subtree-class="+separableSubClasses[i], "rsp-subtree-class=tagAnnotation")
1✔
1616
                                splitTargetClasses[argCount] = append(splitTargetClasses[argCount], separableSubClasses[i])
1✔
1617
                                splitRespClasses[argCount] = computeRespClasses([]string{separableSubClasses[i]})
1✔
1618
                                argCount++
1✔
1619
                        }
1620
                } else {
1✔
1621
                        argSet[defaultArgs-1] = append(argSet[defaultArgs-1], "rsp-subtree-class="+strings.Join(combinableSubClasses, ",")+",tagAnnotation")
1✔
1622
                }
1✔
1623
        }
1624
        if targetFilter != "" {
2✔
1625
                targetFilterArgs := "query-target-filter=" + targetFilter
1✔
1626
                if len(separableSubClasses) == 0 {
2✔
1627
                        argSet[defaultArgs-1] = append(argSet[defaultArgs-1], targetFilterArgs)
1✔
1628
                } else {
1✔
1629
                        for i := 0; i < argCount; i++ {
×
1630
                                argSet[i] = append(argSet[i], targetFilterArgs)
×
1631
                        }
×
1632
                }
1633
        }
1634

1635
        return
1✔
1636
}
1637

1638
func (conn *ApicConnection) processImdata(value string, subId string, args []string, imdata ApicSlice, updateHook ApicObjectHandler, lockHeld bool) {
1✔
1639
        var respObjCount int
1✔
1640
        for _, obj := range imdata {
2✔
1641
                dn := obj.GetDn()
1✔
1642
                if dn == "" {
1✔
1643
                        continue
×
1644
                }
1645
                if subId != "" {
2✔
1646
                        if !lockHeld {
2✔
1647
                                conn.indexMutex.Lock()
1✔
1648
                        }
1✔
1649
                        subIds, found := conn.cacheDnSubIds[dn]
1✔
1650
                        if !found {
2✔
1651
                                subIds = make(map[string]bool)
1✔
1652
                                conn.cacheDnSubIds[dn] = subIds
1✔
1653
                        }
1✔
1654
                        subIds[subId] = true
1✔
1655
                        if !lockHeld {
2✔
1656
                                conn.indexMutex.Unlock()
1✔
1657
                        }
1✔
1658
                }
1659
                if value == "opflexODev" && conn.FilterOpflexDevice {
1✔
1660
                        conn.updateOpflexOdevCache(obj, false, lockHeld)
×
1661
                }
×
1662
                if updateHook != nil && updateHook(obj) {
2✔
1663
                        continue
1✔
1664
                }
1665

1666
                tag := obj.GetTag()
1✔
1667
                if !conn.isSyncTag(tag) {
1✔
1668
                        continue
×
1669
                }
1670

1671
                conn.logger.WithFields(logrus.Fields{
1✔
1672
                        "mod": "APICAPI",
1✔
1673
                        "dn":  dn,
1✔
1674
                        "tag": tag,
1✔
1675
                        "obj": obj,
1✔
1676
                }).Debug("Caching")
1✔
1677
                var count int
1✔
1678
                prepareApicCache("", obj, &count)
1✔
1679
                respObjCount += count
1✔
1680
                if !lockHeld {
2✔
1681
                        conn.indexMutex.Lock()
1✔
1682
                }
1✔
1683
                conn.cachedState[tag] = append(conn.cachedState[tag], obj)
1✔
1684
                if !lockHeld {
2✔
1685
                        conn.indexMutex.Unlock()
1✔
1686
                }
1✔
1687
        }
1688
        if respObjCount >= ApicSubscriptionResponseMoMaxCount/10 {
1✔
1689
                conn.logger.WithFields(logrus.Fields{
×
1690
                        "args":       args,
×
1691
                        "moCount":    respObjCount,
×
1692
                        "maxAllowed": ApicSubscriptionResponseMoMaxCount,
×
1693
                }).Warning("Subscription response is significantly large. Each new object will add 2 Mos atleast and twice the number of labels on the object")
×
1694
        } else {
1✔
1695
                conn.logger.WithFields(logrus.Fields{
1✔
1696
                        "moCount": respObjCount,
1✔
1697
                }).Debug("ResponseObjCount")
1✔
1698
        }
1✔
1699
}
1700

1701
func (conn *ApicConnection) getSyncObject(value string, obj *syncObj) bool {
×
1702
        const defaultArgs = 1
×
1703
        _, _, argSet, argCount := conn.computeArgSet(obj.targetClasses, obj.respClasses, obj.targetFilter, defaultArgs)
×
1704

×
1705
        kind := "mo"
×
1706
        if obj.kind == apicSubClass || obj.kind == apicSubTree {
×
1707
                kind = "class"
×
1708
        }
×
1709

1710
        for i := 0; i < argCount; i++ {
×
1711
                // properly encoding the URI query parameters breaks APIC
×
1712
                uri := fmt.Sprintf("/api/%s/%s.json?%s", kind, value, strings.Join(argSet[i], "&"))
×
1713
                resp, err := conn.sendHTTPSRequestToAPIC("GET", uri, nil, "", false, nil)
×
1714
                if err != nil {
×
1715
                        return false
×
1716
                }
×
1717
                defer complete(resp)
×
1718
                var apicresp ApicResponse
×
1719
                err = json.NewDecoder(resp.Body).Decode(&apicresp)
×
1720
                if err != nil {
×
1721
                        conn.log.Error("Could not decode APIC response", err)
×
1722
                        return false
×
1723
                }
×
1724
                conn.logger.WithFields(logrus.Fields{
×
1725
                        "mod":   "APICAPI",
×
1726
                        "value": value,
×
1727
                        "kind":  kind,
×
1728
                        "args":  argSet[i],
×
1729
                }).Debug("Fetched")
×
1730

×
1731
                conn.processImdata(value, "", argSet[i], apicresp.Imdata, nil, false)
×
1732
        }
1733

1734
        return true
×
1735
}
1736

1737
func (conn *ApicConnection) subscribe(value string, sub *subscription, lockHeld bool) bool {
1✔
1738
        const defaultArgs = 1
1✔
1739
        splitTargetClasses, splitRespClasses, argSet, argCount := conn.computeArgSet(sub.targetClasses, sub.respClasses, sub.targetFilter, defaultArgs)
1✔
1740

1✔
1741
        kind := "mo"
1✔
1742
        if sub.kind == apicSubClass || sub.kind == apicSubTree {
2✔
1743
                kind = "class"
1✔
1744
        }
1✔
1745

1746
        refresh_interval := ""
1✔
1747
        if conn.RefreshInterval != 0 {
2✔
1748
                refresh_interval = fmt.Sprintf("refresh-timeout=%v&",
1✔
1749
                        conn.RefreshInterval.Seconds())
1✔
1750
        }
1✔
1751
        for i := 0; i < argCount; i++ {
2✔
1752
                var apicresp ApicResponse
1✔
1753
                if !conn.doSubscribe(argSet[i], kind, value, refresh_interval, &apicresp) {
1✔
1754
                        return false
×
1755
                }
×
1756
                subId, ok := apicresp.SubscriptionId.(string)
1✔
1757
                if !ok {
1✔
1758
                        conn.log.Error("Subscription ID is not a string")
×
1759
                        return false
×
1760
                }
×
1761

1762
                conn.logger.WithFields(logrus.Fields{
1✔
1763
                        "mod":   "APICAPI",
1✔
1764
                        "value": value,
1✔
1765
                        "kind":  kind,
1✔
1766
                        "id":    subId,
1✔
1767
                        "args":  argSet[i],
1✔
1768
                }).Debug("Subscribed")
1✔
1769
                if !lockHeld {
2✔
1770
                        conn.indexMutex.Lock()
1✔
1771
                }
1✔
1772
                if argCount > defaultArgs {
2✔
1773
                        sub.childSubs[subId] = subComponent{
1✔
1774
                                targetClasses: splitTargetClasses[i],
1✔
1775
                                respClasses:   splitRespClasses[i],
1✔
1776
                        }
1✔
1777
                } else {
2✔
1778
                        conn.subscriptions.subs[value].id = subId
1✔
1779
                }
1✔
1780
                conn.subscriptions.ids[subId] = value
1✔
1781
                if !lockHeld {
2✔
1782
                        conn.indexMutex.Unlock()
1✔
1783
                }
1✔
1784

1785
                conn.processImdata(value, subId, argSet[i], apicresp.Imdata, sub.updateHook, lockHeld)
1✔
1786
        }
1787

1788
        return true
1✔
1789
}
1790

1791
var tagRegexp = regexp.MustCompile(`[a-zA-Z0-9_]{1,31}-[a-f0-9]{32}`)
1792

1793
func (conn *ApicConnection) isSyncTag(tag string) bool {
1✔
1794
        return tagRegexp.MatchString(tag) &&
1✔
1795
                strings.HasPrefix(tag, conn.prefix+"-")
1✔
1796
}
1✔
1797

1798
// isOwnedByController fetches the object from APIC and checks whether
1799
// its tagAnnotation matches this controller's sync tag. Returns true
1800
// only when the object exists on APIC and carries this controller's tag.
1801
func (conn *ApicConnection) isOwnedByController(dn string) bool {
×
1802
        uri := fmt.Sprintf("/api/mo/%s.json?rsp-subtree=children&rsp-subtree-class=tagAnnotation", dn)
×
1803
        resp, err := conn.GetApicResponse(uri)
×
1804
        if err != nil {
×
1805
                conn.log.Warning("Failed to fetch object for tag check: ", dn, " err: ", err)
×
1806
                return false
×
1807
        }
×
1808
        if len(resp.Imdata) == 0 {
×
1809
                // Object no longer exists on APIC; nothing to protect.
×
1810
                conn.log.Debug("Object not found on APIC for tag check: ", dn)
×
1811
                return false
×
1812
        }
×
1813
        tag := resp.Imdata[0].GetTag()
×
1814
        if conn.isSyncTag(tag) {
×
1815
                return true
×
1816
        }
×
1817
        conn.log.Debug("Object tag does not match controller prefix, skipping: ",
×
1818
                dn, " tag: ", tag)
×
1819
        return false
×
1820
}
1821

1822
func getRootDn(dn, rootClass string) string {
1✔
1823
        depth := classDepth[rootClass]
1✔
1824
        parts := strings.Split(dn, "/")
1✔
1825
        parts = parts[:depth]
1✔
1826
        return strings.Join(parts, "/")
1✔
1827
}
1✔
1828

1829
func (conn *ApicConnection) PostApicObjects(uri string, payload ApicSlice) error {
×
1830
        conn.log.Debug("apicIndex: ", conn.Apic[conn.ApicIndex], " uri: ", uri)
×
1831
        conn.log.Debug("Apic POST url: ", uri)
×
1832

×
1833
        if conn.token == "" {
×
1834
                token, err := conn.login()
×
1835
                if err != nil {
×
1836
                        conn.log.Errorf("Login: %v", err)
×
1837
                        return err
×
1838
                }
×
1839
                conn.token = token
×
1840
        }
1841

1842
        raw, err := json.Marshal(payload)
×
1843
        if err != nil {
×
1844
                conn.log.Error("Could not serialize object: ", err)
×
1845
                return err
×
1846
        }
×
1847
        conn.log.Infof("Post: %+v", &struct{ Method, URI string }{"POST", uri})
×
1848
        resp, err := conn.sendHTTPSRequestToAPIC("POST", uri, raw, "application/json", false, nil)
×
1849
        if err != nil {
×
1850
                return err
×
1851
        }
×
1852
        defer complete(resp)
×
1853

×
1854
        if resp.StatusCode != http.StatusOK {
×
1855
                return fmt.Errorf("Status: %v", resp.StatusCode)
×
1856
        }
×
1857

1858
        return nil
×
1859
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc