Coveralls logob
Coveralls logo
  • Home
  • Features
  • Pricing
  • Docs
  • Sign In

weaveworks / weave / #7201

20 Sep 2016 - 16:03 coverage decreased (-0.1%) to 73.98%
#7201

Pull #2501

circleci

7de8841b37903eead0e0bb4d567e9454?size=18&default=identiconbrb
Update weaveworks/mesh submodule
Pull Request #2501: Reject empty peers

6616 of 8943 relevant lines covered (73.98%)

121129.01 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.52
/router/fastdp.go
1
package router
2

3
import (
4
        "encoding/binary"
5
        "encoding/json"
6
        "fmt"
7
        "net"
8
        "sync"
9
        "syscall"
10
        "time"
11

12
        "github.com/vishvananda/netlink"
13
        "github.com/weaveworks/go-odp/odp"
14
        "github.com/weaveworks/mesh"
15
)
16

17
// The virtual bridge accepts packets from ODP vports and the router
18
// port (i.e. InjectPacket).  We need a map key to index those
19
// possibilities:
20
type bridgePortID struct {
21
        vport  odp.VportID
22
        router bool
23
}
24

25
// A bridgeSender sends out a packet from the virtual bridge
26
type bridgeSender func(key PacketKey, lock *fastDatapathLock) FlowOp
27

28
// A missHandler handles an ODP miss
29
type missHandler func(fks odp.FlowKeys, lock *fastDatapathLock) FlowOp
30

31
type FastDatapath struct {
32
        lock             sync.Mutex // guards state and synchronises use of dpif
33
        iface            *net.Interface
34
        dpif             *odp.Dpif
35
        dp               odp.DatapathHandle
36
        deleteFlowsCount uint64
37
        missCount        uint64
38
        missHandlers     map[odp.VportID]missHandler
39
        localPeer        *mesh.Peer
40
        peers            *mesh.Peers
41
        overlayConsumer  OverlayConsumer
42

43
        // Bridge state: How to send to the given bridge port
44
        sendToPort map[bridgePortID]bridgeSender
45

46
        // How to send to a given destination MAC
47
        sendToMAC map[MAC]bridgeSender
48

49
        // MACs seen on the bridge recently
50
        seenMACs map[MAC]struct{}
51

52
        // vxlan vports associated with the given UDP ports
53
        vxlanVportIDs    map[int]odp.VportID
54
        mainVxlanVportID odp.VportID
55

56
        // A singleton pool for the occasions when we need to decode
57
        // the packet.
58
        dec *EthernetDecoder
59

60
        // forwarders by remote peer
61
        forwarders map[mesh.PeerName]*fastDatapathForwarder
62
}
63

64
func NewFastDatapath(iface *net.Interface, port int) (*FastDatapath, error) {
90×
65
        dpif, err := odp.NewDpif()
90×
66
        if err != nil {
90×
67
                return nil, err
!
68
        }
!
69

70
        success := false
90×
71
        defer func() {
180×
72
                if !success {
90×
73
                        dpif.Close()
!
74
                }
!
75
        }()
76

77
        dp, err := dpif.LookupDatapath(iface.Name)
90×
78
        if err != nil {
90×
79
                return nil, err
!
80
        }
!
81

82
        fastdp := &FastDatapath{
90×
83
                iface:         iface,
90×
84
                dpif:          dpif,
90×
85
                dp:            dp,
90×
86
                missHandlers:  make(map[odp.VportID]missHandler),
90×
87
                sendToPort:    nil,
90×
88
                sendToMAC:     make(map[MAC]bridgeSender),
90×
89
                seenMACs:      make(map[MAC]struct{}),
90×
90
                vxlanVportIDs: make(map[int]odp.VportID),
90×
91
                forwarders:    make(map[mesh.PeerName]*fastDatapathForwarder),
90×
92
        }
90×
93

90×
94
        // This delete happens asynchronously in the kernel, meaning that
90×
95
        // we can sometimes fail to recreate the vxlan vport with EADDRINUSE -
90×
96
        // consequently we retry a small number of times in
90×
97
        // getVxlanVportIDHarder() to compensate.
90×
98
        if err := fastdp.deleteVxlanVports(); err != nil {
90×
99
                return nil, err
!
100
        }
!
101

102
        if err := fastdp.deleteFlows(); err != nil {
90×
103
                return nil, err
!
104
        }
!
105

106
        // We use the weave port number plus 1 for vxlan.  Hard-coding
107
        // this relationship may seem dubious, but there is no moral
108
        // difference between this and requiring that the sleeve UDP
109
        // port number is the same as the TCP port number.  The hard
110
        // part would be not adding weaver flags to allow the port
111
        // numbers to be independent, but working out how to specify
112
        // them on the connecting side.  So we can wait to find out if
113
        // anyone wants that.
114
        fastdp.mainVxlanVportID, err = fastdp.getVxlanVportIDHarder(port+1, 5, time.Millisecond*10)
90×
115
        if err != nil {
90×
116
                return nil, err
!
117
        }
!
118

119
        // need to lock before we might receive events
120
        fastdp.lock.Lock()
90×
121
        defer fastdp.lock.Unlock()
90×
122

90×
123
        if _, err := dp.ConsumeMisses(fastdp); err != nil {
90×
124
                return nil, err
!
125
        }
!
126

127
        if _, err := dp.ConsumeVportEvents(fastdp); err != nil {
90×
128
                return nil, err
!
129
        }
!
130

131
        vports, err := dp.EnumerateVports()
90×
132
        if err != nil {
90×
133
                return nil, err
!
134
        }
!
135

136
        for _, vport := range vports {
360×
137
                fastdp.makeBridgeVport(vport)
270×
138
        }
270×
139

140
        success = true
90×
141
        go fastdp.run()
90×
142
        return fastdp, nil
90×
143
}
144

145
func (fastdp *FastDatapath) Close() error {
!
146
        fastdp.lock.Lock()
!
147
        defer fastdp.lock.Unlock()
!
148
        err := fastdp.dpif.Close()
!
149
        fastdp.dpif = nil
!
150
        return err
!
151
}
!
152

153
// While processing a packet, we can potentially acquire and drop the
154
// FastDatapath lock many times (acquiring it to acceess FastDatapath
155
// state, and invoke ODP operations; dropping it to invoke callbacks
156
// that may re-enter the FastDatapath).  A fastDatapathLock
157
// coordinates this process.
158
type fastDatapathLock struct {
159
        fastdp *FastDatapath
160
        locked bool
161

162
        // While the lock is dropped, deleteFlows could be called.  We
163
        // need to detect when this happens and avoid creating flows,
164
        // because they may be based on stale information.
165
        deleteFlowsCount uint64
166
}
167

168
func (fastdp *FastDatapath) startLock() fastDatapathLock {
2,183×
169
        fastdp.lock.Lock()
2,183×
170
        return fastDatapathLock{
2,183×
171
                fastdp:           fastdp,
2,183×
172
                locked:           true,
2,183×
173
                deleteFlowsCount: fastdp.deleteFlowsCount,
2,183×
174
        }
2,183×
175
}
2,183×
176

177
func (lock *fastDatapathLock) unlock() {
3,697×
178
        if lock.locked {
7,394×
179
                lock.fastdp.lock.Unlock()
3,697×
180
                lock.locked = false
3,697×
181
        }
3,697×
182
}
183

184
func (lock *fastDatapathLock) relock() {
4,546×
185
        if !lock.locked {
6,060×
186
                lock.fastdp.lock.Lock()
1,514×
187
                lock.locked = true
1,514×
188
        }
1,514×
189
}
190

191
// Bridge bits
192

193
type fastDatapathBridge struct {
194
        *FastDatapath
195
}
196

197
func (fastdp *FastDatapath) Bridge() Bridge {
90×
198
        return fastDatapathBridge{fastdp}
90×
199
}
90×
200

201
func (fastdp fastDatapathBridge) Interface() *net.Interface {
90×
202
        return fastdp.iface
90×
203
}
90×
204

205
func (fastdp fastDatapathBridge) String() string {
254×
206
        return fmt.Sprint(fastdp.iface.Name, " (via ODP)")
254×
207
}
254×
208

209
func (fastdp fastDatapathBridge) Stats() map[string]int {
164×
210
        lock := fastdp.startLock()
164×
211
        defer lock.unlock()
164×
212

164×
213
        return map[string]int{
164×
214
                "FlowMisses": int(fastdp.missCount),
164×
215
        }
164×
216
}
164×
217

218
var routerBridgePortID = bridgePortID{router: true}
219

220
func (fastdp fastDatapathBridge) StartConsumingPackets(consumer BridgeConsumer) error {
90×
221
        fastdp.lock.Lock()
90×
222
        defer fastdp.lock.Unlock()
90×
223

90×
224
        if fastdp.sendToPort[routerBridgePortID] != nil {
90×
225
                return fmt.Errorf("FastDatapath already has a BridgeConsumer")
!
226
        }
!
227

228
        // set up delivery to the weave router port on the bridge
229
        fastdp.addSendToPort(routerBridgePortID,
90×
230
                func(key PacketKey, lock *fastDatapathLock) FlowOp {
883×
231
                        // drop the FastDatapath lock in order to call
793×
232
                        // the consumer
793×
233
                        lock.unlock()
793×
234
                        return consumer(key)
793×
235
                })
793×
236
        return nil
90×
237
}
238

239
func (fastdp fastDatapathBridge) InjectPacket(key PacketKey) FlowOp {
501×
240
        lock := fastdp.startLock()
501×
241
        defer lock.unlock()
501×
242
        return fastdp.bridge(routerBridgePortID, key, &lock)
501×
243
}
501×
244

245
// Ethernet bridge implementation
246

247
func (fastdp *FastDatapath) bridge(ingress bridgePortID, key PacketKey, lock *fastDatapathLock) FlowOp {
1,294×
248
        lock.relock()
1,294×
249
        if fastdp.sendToMAC[key.SrcMAC] == nil {
1,603×
250
                // Learn the source MAC
309×
251
                fastdp.sendToMAC[key.SrcMAC] = fastdp.sendToPort[ingress]
309×
252
                fastdp.seenMACs[key.SrcMAC] = struct{}{}
309×
253
        }
309×
254

255
        // If we know about the destination MAC, deliver it to the
256
        // associated port.
257
        if sender := fastdp.sendToMAC[key.DstMAC]; sender != nil {
1,449×
258
                return NewMultiFlowOp(false, odpEthernetFlowKey(key), sender(key, lock))
155×
259
        }
155×
260

261
        // Otherwise, it might be a real broadcast, or it might
262
        // be for a MAC we don't know about yet.  Either way, we'll
263
        // broadcast it.
264
        mfop := NewMultiFlowOp(false)
1,139×
265

1,139×
266
        if (key.DstMAC[0] & 1) == 0 {
1,140×
267
                // Not a real broadcast, so don't create a flow rule.
1×
268
                // If we did, we'd need to delete the flows every time
1×
269
                // we learned a new MAC address, or have a more
1×
270
                // complicated selective invalidation scheme.
1×
271
                log.Debug("fastdp: unknown dst", ingress, key)
1×
272
                mfop.Add(vetoFlowCreationFlowOp{})
1×
273
        } else {
1,139×
274
                // A real broadcast
1,138×
275
                log.Debug("fastdp: broadcast", ingress, key)
1,138×
276
                mfop.Add(odpEthernetFlowKey(key))
1,138×
277
        }
1,138×
278

279
        // Send to all ports except the one it came in on. The
280
        // sendToPort map is immutable, so it is safe to iterate over
281
        // it even though the sender functions can drop the
282
        // fastDatapathLock
283
        for id, sender := range fastdp.sendToPort {
4,556×
284
                if id != ingress {
5,695×
285
                        mfop.Add(sender(key, lock))
2,278×
286
                }
2,278×
287
        }
288

289
        return mfop
1,139×
290
}
291

292
// Overlay bits
293

294
type fastDatapathOverlay struct {
295
        *FastDatapath
296
}
297

298
func (fastdp *FastDatapath) Overlay() NetworkOverlay {
90×
299
        return fastDatapathOverlay{fastdp}
90×
300
}
90×
301

302
func (fastdp fastDatapathOverlay) InvalidateRoutes() {
404×
303
        log.Debug("InvalidateRoutes")
404×
304
        fastdp.lock.Lock()
404×
305
        defer fastdp.lock.Unlock()
404×
306
        checkWarn(fastdp.deleteFlows())
404×
307
}
404×
308

309
func (fastdp fastDatapathOverlay) InvalidateShortIDs() {
1×
310
        log.Debug("InvalidateShortIDs")
1×
311
        fastdp.lock.Lock()
1×
312
        defer fastdp.lock.Unlock()
1×
313
        checkWarn(fastdp.deleteFlows())
1×
314
}
1×
315

316
func (fastDatapathOverlay) AddFeaturesTo(features map[string]string) {
!
317
        // Nothing needed.  Fast datapath support is indicated through
!
318
        // OverlaySwitch.
!
319
}
!
320

321
type FlowStatus odp.FlowInfo
322

323
func (flowStatus *FlowStatus) MarshalJSON() ([]byte, error) {
!
324
        type jsonFlowStatus struct {
!
325
                FlowKeys []string
!
326
                Actions  []string
!
327
                Packets  uint64
!
328
                Bytes    uint64
!
329
                Used     uint64
!
330
        }
!
331

!
332
        flowKeys := make([]string, 0, len(flowStatus.FlowKeys))
!
333
        for _, flowKey := range flowStatus.FlowKeys {
!
334
                if !flowKey.Ignored() {
!
335
                        flowKeys = append(flowKeys, fmt.Sprint(flowKey))
!
336
                }
!
337
        }
338

339
        actions := make([]string, 0, len(flowStatus.Actions))
!
340
        for _, action := range flowStatus.Actions {
!
341
                actions = append(actions, fmt.Sprint(action))
!
342
        }
!
343

344
        return json.Marshal(&jsonFlowStatus{flowKeys, actions, flowStatus.Packets, flowStatus.Bytes, flowStatus.Used})
!
345
}
346

347
type VportStatus odp.Vport
348

349
func (vport *VportStatus) MarshalJSON() ([]byte, error) {
3×
350
        type jsonVportStatus struct {
3×
351
                ID       odp.VportID
3×
352
                Name     string
3×
353
                TypeName string
3×
354
        }
3×
355

3×
356
        return json.Marshal(&jsonVportStatus{vport.ID, vport.Spec.Name(), vport.Spec.TypeName()})
3×
357
}
3×
358

359
func (fastdp fastDatapathOverlay) Diagnostics() interface{} {
164×
360
        lock := fastdp.startLock()
164×
361
        defer lock.unlock()
164×
362

164×
363
        vports, err := fastdp.dp.EnumerateVports()
164×
364
        checkWarn(err)
164×
365
        vportStatuses := make([]VportStatus, 0, len(vports))
164×
366
        for _, vport := range vports {
656×
367
                vportStatuses = append(vportStatuses, VportStatus(vport))
492×
368
        }
492×
369

370
        flows, err := fastdp.dp.EnumerateFlows()
164×
371
        checkWarn(err)
164×
372
        flowStatuses := make([]FlowStatus, 0, len(flows))
164×
373
        for _, flow := range flows {
314×
374
                flowStatuses = append(flowStatuses, FlowStatus(flow))
150×
375
        }
150×
376

377
        return struct {
164×
378
                Vports []VportStatus
164×
379
                Flows  []FlowStatus
164×
380
        }{
164×
381
                vportStatuses,
164×
382
                flowStatuses,
164×
383
        }
164×
384
}
385

386
func (fastdp fastDatapathOverlay) StartConsumingPackets(localPeer *mesh.Peer, peers *mesh.Peers, consumer OverlayConsumer) error {
90×
387
        fastdp.lock.Lock()
90×
388
        defer fastdp.lock.Unlock()
90×
389

90×
390
        if fastdp.overlayConsumer != nil {
90×
391
                return fmt.Errorf("FastDatapath already has an OverlayConsumer")
!
392
        }
!
393

394
        fastdp.localPeer = localPeer
90×
395
        fastdp.peers = peers
90×
396
        fastdp.overlayConsumer = consumer
90×
397
        return nil
90×
398
}
399

400
func (fastdp *FastDatapath) getVxlanVportIDHarder(udpPort int, retries int, duration time.Duration) (odp.VportID, error) {
90×
401
        var vxlanVportID odp.VportID
90×
402
        var err error
90×
403
        for try := 0; try < retries; try++ {
180×
404
                vxlanVportID, err = fastdp.getVxlanVportID(udpPort)
90×
405
                if err == nil || err != odp.NetlinkError(syscall.EADDRINUSE) {
180×
406
                        return vxlanVportID, err
90×
407
                }
90×
408
                log.Warning("Address already in use creating vxlan vport ", udpPort, " - retrying")
!
409
                time.Sleep(duration)
!
410
        }
411
        return 0, err
!
412
}
413

414
func (fastdp *FastDatapath) getVxlanVportID(udpPort int) (odp.VportID, error) {
132×
415
        fastdp.lock.Lock()
132×
416
        defer fastdp.lock.Unlock()
132×
417

132×
418
        if vxlanVportID, present := fastdp.vxlanVportIDs[udpPort]; present {
174×
419
                return vxlanVportID, nil
42×
420
        }
42×
421

422
        name := fmt.Sprintf("vxlan-%d", udpPort)
90×
423
        vxlanVportID, err := fastdp.dp.CreateVport(
90×
424
                odp.NewVxlanVportSpec(name, uint16(udpPort)))
90×
425
        if err != nil {
90×
426
                return 0, err
!
427
        }
!
428

429
        // If a netdev for the vxlan vport exists, we need to do an extra check
430
        // to bypass the kernel bug which makes the vxlan creation to complete
431
        // successfully regardless whether there were any errors when binding
432
        // to the given UDP port.
433
        if link, err := netlink.LinkByName(name); err == nil {
90×
434
                if link.Attrs().Flags&net.FlagUp == 0 {
!
435
                        // The netdev interface is down, so most likely bringing it up
!
436
                        // has failed due to the UDP port being in use.
!
437
                        if err := fastdp.dp.DeleteVport(vxlanVportID); err != nil {
!
438
                                log.Warning("Unable to remove vxlan vport %d: %s", vxlanVportID, err)
!
439
                        }
!
440
                        return 0, odp.NetlinkError(syscall.EADDRINUSE)
!
441
                }
442
        }
443

444
        fastdp.vxlanVportIDs[udpPort] = vxlanVportID
90×
445
        fastdp.missHandlers[vxlanVportID] = func(fks odp.FlowKeys, lock *fastDatapathLock) FlowOp {
651×
446
                log.Debug("ODP miss: ", fks, " on port ", vxlanVportID)
561×
447
                tunnel := fks[odp.OVS_KEY_ATTR_TUNNEL].(odp.TunnelFlowKey)
561×
448
                tunKey := tunnel.Key()
561×
449

561×
450
                lock.relock()
561×
451
                consumer := fastdp.overlayConsumer
561×
452
                if consumer == nil {
561×
453
                        return vetoFlowCreationFlowOp{}
!
454
                }
!
455

456
                srcPeer, dstPeer := fastdp.extractPeers(tunKey.TunnelId)
561×
457
                if srcPeer == nil || dstPeer == nil {
578×
458
                        return vetoFlowCreationFlowOp{}
17×
459
                }
17×
460

461
                lock.unlock()
544×
462
                pk := flowKeysToPacketKey(fks)
544×
463
                var zeroMAC MAC
544×
464
                if pk.SrcMAC == zeroMAC && pk.DstMAC == zeroMAC {
664×
465
                        return vxlanSpecialPacketFlowOp{
120×
466
                                fastdp:  fastdp,
120×
467
                                srcPeer: srcPeer,
120×
468
                                sender: &net.UDPAddr{
120×
469
                                        IP:   net.IP(tunKey.Ipv4Src[:]),
120×
470
                                        Port: udpPort,
120×
471
                                },
120×
472
                        }
120×
473
                }
120×
474

475
                key := ForwardPacketKey{
424×
476
                        SrcPeer:   srcPeer,
424×
477
                        DstPeer:   dstPeer,
424×
478
                        PacketKey: pk,
424×
479
                }
424×
480

424×
481
                var tunnelFlowKey odp.TunnelFlowKey
424×
482
                tunnelFlowKey.SetTunnelId(tunKey.TunnelId)
424×
483
                tunnelFlowKey.SetIpv4Src(tunKey.Ipv4Src)
424×
484
                tunnelFlowKey.SetIpv4Dst(tunKey.Ipv4Dst)
424×
485

424×
486
                return NewMultiFlowOp(false, odpFlowKey(tunnelFlowKey), consumer(key))
424×
487
        }
488

489
        return vxlanVportID, nil
90×
490
}
491

492
func (fastdp *FastDatapath) extractPeers(tunnelID [8]byte) (*mesh.Peer, *mesh.Peer) {
561×
493
        vni := binary.BigEndian.Uint64(tunnelID[:])
561×
494
        srcPeer := fastdp.peers.FetchByShortID(mesh.PeerShortID(vni & 0xfff))
561×
495
        dstPeer := fastdp.peers.FetchByShortID(mesh.PeerShortID((vni >> 12) & 0xfff))
561×
496
        return srcPeer, dstPeer
561×
497
}
561×
498

499
type vxlanSpecialPacketFlowOp struct {
500
        NonDiscardingFlowOp
501
        fastdp  *FastDatapath
502
        srcPeer *mesh.Peer
503
        sender  *net.UDPAddr
504
}
505

506
func (op vxlanSpecialPacketFlowOp) Process(frame []byte, dec *EthernetDecoder, broadcast bool) {
120×
507
        op.fastdp.lock.Lock()
120×
508
        fwd := op.fastdp.forwarders[op.srcPeer.Name]
120×
509
        op.fastdp.lock.Unlock()
120×
510

120×
511
        if fwd != nil && dec.IsSpecial() {
239×
512
                fwd.handleVxlanSpecialPacket(frame, op.sender)
119×
513
        }
119×
514
}
515

516
type fastDatapathForwarder struct {
517
        fastdp         *FastDatapath
518
        remotePeer     *mesh.Peer
519
        localIP        [4]byte
520
        sendControlMsg func(byte, []byte) error
521
        connUID        uint64
522
        vxlanVportID   odp.VportID
523

524
        lock              sync.RWMutex
525
        confirmed         bool
526
        remoteAddr        *net.UDPAddr
527
        heartbeatInterval time.Duration
528
        heartbeatTimer    *time.Timer
529
        heartbeatTimeout  *time.Timer
530
        ackedHeartbeat    bool
531
        stopChan          chan struct{}
532
        stopped           bool
533

534
        establishedChan chan struct{}
535
        errorChan       chan error
536
}
537

538
func (fastdp fastDatapathOverlay) PrepareConnection(params mesh.OverlayConnectionParams) (mesh.OverlayConnection, error) {
82×
539
        if params.SessionKey != nil {
84×
540
                // No encryption support in fastdp
2×
541
                return nil, fmt.Errorf("encryption not supported")
2×
542
        }
2×
543

544
        vxlanVportID := fastdp.mainVxlanVportID
80×
545
        var remoteAddr *net.UDPAddr
80×
546

80×
547
        if params.Outbound {
122×
548
                remoteAddr = makeUDPAddr(params.RemoteAddr)
42×
549
                // The provided address contains the main weave port
42×
550
                // number to connect to.  We need to derive the vxlan
42×
551
                // port number from that.
42×
552
                vxlanRemoteAddr := *remoteAddr
42×
553
                vxlanRemoteAddr.Port++
42×
554
                remoteAddr = &vxlanRemoteAddr
42×
555
                var err error
42×
556
                vxlanVportID, err = fastdp.getVxlanVportID(remoteAddr.Port)
42×
557
                if err != nil {
42×
558
                        return nil, err
!
559
                }
!
560
        }
561

562
        localIP, err := ipv4Bytes(params.LocalAddr.IP)
80×
563
        if err != nil {
80×
564
                return nil, err
!
565
        }
!
566

567
        fwd := &fastDatapathForwarder{
80×
568
                fastdp:         fastdp.FastDatapath,
80×
569
                remotePeer:     params.RemotePeer,
80×
570
                localIP:        localIP,
80×
571
                sendControlMsg: params.SendControlMessage,
80×
572
                connUID:        params.ConnUID,
80×
573
                vxlanVportID:   vxlanVportID,
80×
574

80×
575
                remoteAddr:        remoteAddr,
80×
576
                heartbeatInterval: FastHeartbeat,
80×
577
                stopChan:          make(chan struct{}),
80×
578

80×
579
                establishedChan: make(chan struct{}),
80×
580
                errorChan:       make(chan error, 1),
80×
581
        }
80×
582

80×
583
        return fwd, err
80×
584
}
585

586
func ipv4Bytes(ip net.IP) (res [4]byte, err error) {
695×
587
        ipv4 := ip.To4()
695×
588
        if ipv4 != nil {
1,390×
589
                copy(res[:], ipv4)
695×
590
        } else {
695×
591
                err = fmt.Errorf("IP address %s is not IPv4", ip)
!
592
        }
!
593
        return
695×
594
}
595

596
func (fwd *fastDatapathForwarder) logPrefix() string {
464×
597
        return fmt.Sprintf("fastdp ->[%s|%s]: ", fwd.remoteAddr, fwd.remotePeer)
464×
598
}
464×
599

600
func (fwd *fastDatapathForwarder) Confirm() {
78×
601
        fwd.lock.Lock()
78×
602
        defer fwd.lock.Unlock()
78×
603

78×
604
        if fwd.confirmed {
78×
605
                log.Fatal(fwd.logPrefix(), "already confirmed")
!
606
        }
!
607

608
        log.Debug(fwd.logPrefix(), "confirmed")
78×
609
        fwd.fastdp.addForwarder(fwd.remotePeer.Name, fwd)
78×
610
        fwd.confirmed = true
78×
611

78×
612
        if fwd.remoteAddr != nil {
118×
613
                // have the goroutine send a heartbeat straight away
40×
614
                fwd.heartbeatTimer = time.NewTimer(0)
40×
615
        } else {
78×
616
                // we'll reset the timer when we learn the remote ip
38×
617
                fwd.heartbeatTimer = time.NewTimer(MaxDuration)
38×
618
        }
38×
619

620
        fwd.heartbeatTimeout = time.NewTimer(HeartbeatTimeout)
78×
621
        go fwd.doHeartbeats()
78×
622
}
623

624
func (fwd *fastDatapathForwarder) EstablishedChannel() <-chan struct{} {
80×
625
        return fwd.establishedChan
80×
626
}
80×
627

628
func (fwd *fastDatapathForwarder) ErrorChannel() <-chan error {
150×
629
        return fwd.errorChan
150×
630
}
150×
631

632
func (fwd *fastDatapathForwarder) doHeartbeats() {
78×
633
        var err error
78×
634

78×
635
        for err == nil {
353×
636
                select {
275×
637
                case <-fwd.heartbeatTimer.C:
197×
638
                        if fwd.confirmed {
394×
639
                                fwd.sendHeartbeat()
197×
640
                        }
197×
641
                        fwd.heartbeatTimer.Reset(fwd.heartbeatInterval)
197×
642

643
                case <-fwd.heartbeatTimeout.C:
!
644
                        err = fmt.Errorf("timed out waiting for vxlan heartbeat")
!
645

646
                case <-fwd.stopChan:
46×
647
                        return
46×
648
                }
649
        }
650

651
        fwd.lock.Lock()
!
652
        defer fwd.lock.Unlock()
!
653
        fwd.handleError(err)
!
654
}
655

656
// Handle an error which leads to notifying the listener and
657
// termination of the forwarder
658
func (fwd *fastDatapathForwarder) handleError(err error) {
70×
659
        if err == nil {
140×
660
                return
70×
661
        }
70×
662

663
        select {
!
664
        case fwd.errorChan <- err:
!
665
        default:
!
666
        }
667

668
        // stop the heartbeat goroutine
669
        if !fwd.stopped {
!
670
                fwd.stopped = true
!
671
                close(fwd.stopChan)
!
672
        }
!
673
}
674

675
func (fwd *fastDatapathForwarder) sendHeartbeat() {
197×
676
        fwd.lock.RLock()
197×
677
        log.Debug(fwd.logPrefix(), "sendHeartbeat")
197×
678

197×
679
        // the heartbeat payload consists of the 64-bit connection uid
197×
680
        // followed by the 16-bit packet size.
197×
681
        buf := make([]byte, EthernetOverhead+fwd.fastdp.iface.MTU)
197×
682
        binary.BigEndian.PutUint64(buf[EthernetOverhead:], fwd.connUID)
197×
683
        binary.BigEndian.PutUint16(buf[EthernetOverhead+8:], uint16(len(buf)))
197×
684

197×
685
        dec := NewEthernetDecoder()
197×
686
        dec.DecodeLayers(buf)
197×
687
        pk := ForwardPacketKey{
197×
688
                PacketKey: dec.PacketKey(),
197×
689
                SrcPeer:   fwd.fastdp.localPeer,
197×
690
                DstPeer:   fwd.remotePeer,
197×
691
        }
197×
692
        fwd.lock.RUnlock()
197×
693

197×
694
        if fop := fwd.Forward(pk); fop != nil {
394×
695
                fop.Process(buf, dec, false)
197×
696
        }
197×
697
}
698

699
const (
700
        FastDatapathHeartbeatAck = iota
701
)
702

703
func (fwd *fastDatapathForwarder) handleVxlanSpecialPacket(frame []byte, sender *net.UDPAddr) {
119×
704
        fwd.lock.Lock()
119×
705
        defer fwd.lock.Unlock()
119×
706

119×
707
        log.Debug(fwd.logPrefix(), "handleVxlanSpecialPacket")
119×
708

119×
709
        // the only special packet type is a heartbeat
119×
710
        if len(frame) < EthernetOverhead+10 {
119×
711
                log.Warning(fwd.logPrefix(), "short vxlan special packet: ", len(frame), " bytes")
!
712
                return
!
713
        }
!
714

715
        if binary.BigEndian.Uint64(frame[EthernetOverhead:]) != fwd.connUID ||
119×
716
                uint16(len(frame)) != binary.BigEndian.Uint16(frame[EthernetOverhead+8:]) {
119×
UNCOV
717
                return
!
UNCOV
718
        }
!
719

720
        if fwd.remoteAddr == nil {
152×
721
                fwd.remoteAddr = sender
33×
722

33×
723
                if fwd.confirmed {
66×
724
                        fwd.heartbeatTimer.Reset(0)
33×
725
                }
33×
726
        } else if !udpAddrsEqual(fwd.remoteAddr, sender) {
86×
727
                log.Info(fwd.logPrefix(), "Peer IP address changed to ", sender)
!
728
                fwd.remoteAddr = sender
!
729
        }
!
730

731
        if !fwd.ackedHeartbeat {
189×
732
                fwd.ackedHeartbeat = true
70×
733
                fwd.handleError(fwd.sendControlMsg(FastDatapathHeartbeatAck, nil))
70×
734
        }
70×
735

736
        // we can receive a heartbeat before Confirm() has set up
737
        // heartbeatTimeout
738
        if fwd.heartbeatTimeout != nil {
238×
739
                fwd.heartbeatTimeout.Reset(HeartbeatTimeout)
119×
740
        }
119×
741
}
742

743
func (fwd *fastDatapathForwarder) ControlMessage(tag byte, msg []byte) {
70×
744
        fwd.lock.Lock()
70×
745
        defer fwd.lock.Unlock()
70×
746

70×
747
        switch tag {
70×
748
        case FastDatapathHeartbeatAck:
70×
749
                fwd.handleHeartbeatAck()
70×
750

751
        default:
!
752
                log.Info(fwd.logPrefix(), "Ignoring unknown control message: ", tag)
!
753
        }
754
}
755

756
func (fwd *fastDatapathForwarder) DisplayName() string {
58×
757
        return "fastdp"
58×
758
}
58×
759

760
func (fwd *fastDatapathForwarder) handleHeartbeatAck() {
70×
761
        log.Debug(fwd.logPrefix(), "handleHeartbeatAck")
70×
762

70×
763
        if fwd.heartbeatInterval != SlowHeartbeat {
140×
764
                close(fwd.establishedChan)
70×
765
                fwd.heartbeatInterval = SlowHeartbeat
70×
766
                if fwd.heartbeatTimer != nil {
140×
767
                        fwd.heartbeatTimer.Reset(fwd.heartbeatInterval)
70×
768
                }
70×
769
        }
770
}
771

772
func (fwd *fastDatapathForwarder) Forward(key ForwardPacketKey) FlowOp {
615×
773
        if !key.SrcPeer.HasShortID || !key.DstPeer.HasShortID {
615×
774
                return nil
!
775
        }
!
776

777
        fwd.lock.RLock()
615×
778
        defer fwd.lock.RUnlock()
615×
779

615×
780
        if fwd.remoteAddr == nil {
615×
781
                // Returning a DiscardingFlowOp would discard the
!
782
                // packet, but also result in a flow rule, which we
!
783
                // would have to invalidate when we learn the remote
!
784
                // IP.  So for now, just prevent flows.
!
785
                return vetoFlowCreationFlowOp{}
!
786
        }
!
787

788
        remoteIP, err := ipv4Bytes(fwd.remoteAddr.IP)
615×
789
        if err != nil {
615×
790
                log.Error(err)
!
791
                return DiscardingFlowOp{}
!
792
        }
!
793

794
        var sta odp.SetTunnelAction
615×
795
        sta.SetTunnelId(tunnelIDFor(key))
615×
796
        sta.SetIpv4Src(fwd.localIP)
615×
797
        sta.SetIpv4Dst(remoteIP)
615×
798
        sta.SetTos(0)
615×
799
        sta.SetTtl(64)
615×
800
        sta.SetDf(true)
615×
801
        sta.SetCsum(false)
615×
802
        return fwd.fastdp.odpActions(sta, odp.NewOutputAction(fwd.vxlanVportID))
615×
803
}
804

805
func tunnelIDFor(key ForwardPacketKey) (tunnelID [8]byte) {
615×
806
        src := uint64(key.SrcPeer.ShortID)
615×
807
        dst := uint64(key.DstPeer.ShortID)
615×
808
        binary.BigEndian.PutUint64(tunnelID[:], src|dst<<12)
615×
809
        return
615×
810
}
615×
811

812
func (fwd *fastDatapathForwarder) Stop() {
48×
813
        // Might be nice to delete all the relevant flows here, but we
48×
814
        // can just let them expire.
48×
815
        fwd.fastdp.removeForwarder(fwd.remotePeer.Name, fwd)
48×
816

48×
817
        fwd.lock.Lock()
48×
818
        defer fwd.lock.Unlock()
48×
819
        fwd.sendControlMsg = func(byte, []byte) error { return nil }
48×
820

821
        // stop the heartbeat goroutine
822
        if !fwd.stopped {
96×
823
                fwd.stopped = true
48×
824
                close(fwd.stopChan)
48×
825
        }
48×
826
}
827

828
func (fastdp *FastDatapath) addForwarder(peer mesh.PeerName, fwd *fastDatapathForwarder) {
78×
829
        fastdp.lock.Lock()
78×
830
        defer fastdp.lock.Unlock()
78×
831

78×
832
        // We shouldn't have two confirmed forwarders to the same
78×
833
        // remotePeer, due to the checks in LocalPeer AddConnection.
78×
834
        fastdp.forwarders[peer] = fwd
78×
835
}
78×
836

837
func (fastdp *FastDatapath) removeForwarder(peer mesh.PeerName, fwd *fastDatapathForwarder) {
48×
838
        fastdp.lock.Lock()
48×
839
        defer fastdp.lock.Unlock()
48×
840
        if fastdp.forwarders[peer] == fwd {
89×
841
                delete(fastdp.forwarders, peer)
41×
842
        }
41×
843
}
844

845
func (fastdp *FastDatapath) deleteFlows() error {
675×
846
        fastdp.deleteFlowsCount++
675×
847

675×
848
        flows, err := fastdp.dp.EnumerateFlows()
675×
849
        if err != nil {
675×
850
                return err
!
851
        }
!
852

853
        for _, flow := range flows {
1,351×
854
                err = fastdp.dp.DeleteFlow(flow.FlowKeys)
676×
855
                if err != nil && !odp.IsNoSuchFlowError(err) {
676×
856
                        return err
!
857
                }
!
858
        }
859

860
        return nil
675×
861
}
862

863
func (fastdp *FastDatapath) deleteVxlanVports() error {
90×
864
        vports, err := fastdp.dp.EnumerateVports()
90×
865
        if err != nil {
90×
866
                return err
!
867
        }
!
868

869
        for _, vport := range vports {
292×
870
                if vport.Spec.TypeName() != "vxlan" {
382×
871
                        continue
180×
872
                }
873

874
                err = fastdp.dp.DeleteVport(vport.ID)
22×
875
                if err != nil && !odp.IsNoSuchVportError(err) {
22×
876
                        return err
!
877
                }
!
878
        }
879

880
        return nil
90×
881
}
882

883
func (fastdp *FastDatapath) run() {
90×
884
        expireMACsCh := time.Tick(10 * time.Minute)
90×
885
        expireFlowsCh := time.Tick(5 * time.Minute)
90×
886

90×
887
        for {
180×
888
                select {
90×
889
                case <-expireMACsCh:
!
890
                        fastdp.expireMACs()
!
891

892
                case <-expireFlowsCh:
!
893
                        fastdp.expireFlows()
!
894
                }
895
        }
896
}
897

898
func (fastdp *FastDatapath) expireMACs() {
!
899
        lock := fastdp.startLock()
!
900
        defer lock.unlock()
!
901

!
902
        for mac := range fastdp.sendToMAC {
!
903
                if _, present := fastdp.seenMACs[mac]; !present {
!
904
                        delete(fastdp.sendToMAC, mac)
!
905
                }
!
906
        }
907

908
        fastdp.seenMACs = make(map[MAC]struct{})
!
909
}
910

911
func (fastdp *FastDatapath) expireFlows() {
!
912
        lock := fastdp.startLock()
!
913
        defer lock.unlock()
!
914

!
915
        flows, err := fastdp.dp.EnumerateFlows()
!
916
        checkWarn(err)
!
917

!
918
        for _, flow := range flows {
!
919
                if flow.Used == 0 {
!
920
                        log.Debug("Expiring flow ", flow.FlowSpec)
!
921
                        err = fastdp.dp.DeleteFlow(flow.FlowKeys)
!
922
                } else {
!
923
                        fastdp.touchFlow(flow.FlowKeys, &lock)
!
924
                        err = fastdp.dp.ClearFlow(flow.FlowSpec)
!
925
                }
!
926

927
                if err != nil && !odp.IsNoSuchFlowError(err) {
!
928
                        log.Warn(err)
!
929
                }
!
930
        }
931
}
932

933
// The router needs to know which flows are active in order to
934
// maintain its MAC->peer table.  We do this by querying the router
935
// without an actual packet being involved.  Maybe it's
936
// worth devising a more unified approach in the future.
937
func (fastdp *FastDatapath) touchFlow(fks odp.FlowKeys, lock *fastDatapathLock) {
!
938
        // All the flows we create should have an ingress key, but we
!
939
        // check here just in case we encounter one from somewhere
!
940
        // else.
!
941
        ingressKey, present := fks[odp.OVS_KEY_ATTR_IN_PORT]
!
942
        if present {
!
943
                ingress := ingressKey.(odp.InPortFlowKey).VportID()
!
944
                handler := fastdp.getMissHandler(ingress)
!
945
                if handler != nil {
!
946
                        handler(fks, lock)
!
947
                        lock.relock()
!
948
                }
!
949
        }
950
}
951

952
func (fastdp *FastDatapath) Error(err error, stopped bool) {
!
953
        if stopped {
!
954
                log.Fatal("Error while listeniing on ODP datapath: ", err)
!
955
        }
!
956

957
        log.Error("Error while listening on ODP datapath: ", err)
!
958
}
959

960
func (fastdp *FastDatapath) Miss(packet []byte, fks odp.FlowKeys) error {
1,354×
961
        ingress := fks[odp.OVS_KEY_ATTR_IN_PORT].(odp.InPortFlowKey).VportID()
1,354×
962

1,354×
963
        lock := fastdp.startLock()
1,354×
964
        defer lock.unlock()
1,354×
965

1,354×
966
        fastdp.missCount++
1,354×
967

1,354×
968
        handler := fastdp.getMissHandler(ingress)
1,354×
969
        if handler == nil {
1,354×
970
                log.Debug("ODP miss (no handler): ", fks, " on port ", ingress)
!
971
                return nil
!
972
        }
!
973

974
        // Always include the ingress vport in the flow key.  While
975
        // this is not strictly necessary in some cases (e.g. for
976
        // delivery to a local netdev based on the dest MAC),
977
        // including the ingress in every flow makes things simpler
978
        // in touchFlow.
979
        mfop := NewMultiFlowOp(false, handler(fks, &lock), odpFlowKey(odp.NewInPortFlowKey(ingress)))
1,354×
980
        fastdp.send(mfop, packet, &lock)
1,354×
981
        return nil
1,354×
982
}
983

984
func (fastdp *FastDatapath) getMissHandler(ingress odp.VportID) missHandler {
1,354×
985
        handler := fastdp.missHandlers[ingress]
1,354×
986
        if handler == nil {
1,354×
987
                vport, err := fastdp.dp.LookupVport(ingress)
!
988
                if err != nil {
!
989
                        log.Error(err)
!
990
                        return nil
!
991
                }
!
992

993
                fastdp.makeBridgeVport(vport)
!
994
        }
995

996
        return handler
1,354×
997
}
998

999
func (fastdp *FastDatapath) VportCreated(dpid odp.DatapathID, vport odp.Vport) error {
!
1000
        fastdp.lock.Lock()
!
1001
        defer fastdp.lock.Unlock()
!
1002

!
1003
        if _, present := fastdp.missHandlers[vport.ID]; !present {
!
1004
                fastdp.makeBridgeVport(vport)
!
1005
        }
!
1006

1007
        return nil
!
1008
}
1009

1010
func (fastdp *FastDatapath) VportDeleted(dpid odp.DatapathID, vport odp.Vport) error {
!
1011
        fastdp.lock.Lock()
!
1012
        defer fastdp.lock.Unlock()
!
1013

!
1014
        // there might be flow rules that still refer to the id of
!
1015
        // this vport.  But we just allow them to expire.  Unless we
!
1016
        // want prompt migration of MAC addresses, that should be
!
1017
        // fine.
!
1018
        delete(fastdp.missHandlers, vport.ID)
!
1019
        fastdp.deleteSendToPort(bridgePortID{vport: vport.ID})
!
1020
        return nil
!
1021
}
!
1022

1023
func (fastdp *FastDatapath) makeBridgeVport(vport odp.Vport) {
270×
1024
        // Set up a bridge port for netdev and internal vports.  vxlan
270×
1025
        // vports are handled separately, as they do not correspond to
270×
1026
        // bridge ports (we set up the miss handler for them in
270×
1027
        // getVxlanVportID).
270×
1028
        typ := vport.Spec.TypeName()
270×
1029
        if typ != "netdev" && typ != "internal" {
360×
1030
                return
90×
1031
        }
90×
1032

1033
        vportID := vport.ID
180×
1034

180×
1035
        // Sending to the bridge port outputs on the vport:
180×
1036
        fastdp.addSendToPort(bridgePortID{vport: vportID},
180×
1037
                func(_ PacketKey, _ *fastDatapathLock) FlowOp {
1,820×
1038
                        return fastdp.odpActions(odp.NewOutputAction(vportID))
1,640×
1039
                })
1,640×
1040

1041
        // Delete flows, in order to recalculate flows for broadcasts
1042
        // on the bridge.
1043
        checkWarn(fastdp.deleteFlows())
180×
1044

180×
1045
        // Packets coming from the netdev are processed by the bridge
180×
1046
        fastdp.missHandlers[vportID] = func(flowKeys odp.FlowKeys, lock *fastDatapathLock) FlowOp {
973×
1047
                return fastdp.bridge(bridgePortID{vport: vportID}, flowKeysToPacketKey(flowKeys), lock)
793×
1048
        }
793×
1049
}
1050

1051
func flowKeysToPacketKey(fks odp.FlowKeys) PacketKey {
1,337×
1052
        eth := fks[odp.OVS_KEY_ATTR_ETHERNET].(odp.EthernetFlowKey).Key()
1,337×
1053
        return PacketKey{SrcMAC: eth.EthSrc, DstMAC: eth.EthDst}
1,337×
1054
}
1,337×
1055

1056
// The sendToPort map is read-only, so this method does the copy in
1057
// order to add an entry.
1058
func (fastdp *FastDatapath) addSendToPort(portID bridgePortID, sender bridgeSender) {
270×
1059
        sendToPort := map[bridgePortID]bridgeSender{portID: sender}
270×
1060
        for id, sender := range fastdp.sendToPort {
540×
1061
                sendToPort[id] = sender
270×
1062
        }
270×
1063
        fastdp.sendToPort = sendToPort
270×
1064
}
1065

1066
func (fastdp *FastDatapath) deleteSendToPort(portID bridgePortID) {
!
1067
        sendToPort := make(map[bridgePortID]bridgeSender)
!
1068
        for id, sender := range fastdp.sendToPort {
!
1069
                if id != portID {
!
1070
                        sendToPort[id] = sender
!
1071
                }
!
1072
        }
1073
        fastdp.sendToPort = sendToPort
!
1074
}
1075

1076
// Send a packet, creating a corresponding ODP flow rule if possible
1077
func (fastdp *FastDatapath) send(fops FlowOp, frame []byte, lock *fastDatapathLock) {
1,354×
1078
        // Gather the actions from actionFlowOps, execute any others
1,354×
1079
        var dec *EthernetDecoder
1,354×
1080
        flow := odp.NewFlowSpec()
1,354×
1081
        createFlow := true
1,354×
1082

1,354×
1083
        for _, xfop := range FlattenFlowOp(fops) {
7,325×
1084
                switch fop := xfop.(type) {
5,971×
1085
                case interface {
4,938×
1086
                        updateFlowSpec(*odp.FlowSpec)
4,938×
1087
                }:
4,938×
1088
                        fop.updateFlowSpec(&flow)
4,938×
1089
                case vetoFlowCreationFlowOp:
18×
1090
                        createFlow = false
18×
1091
                default:
1,015×
1092
                        if xfop.Discards() {
1,853×
1093
                                continue
838×
1094
                        }
1095

1096
                        // A foreign FlowOp (e.g. a sleeve forwarding
1097
                        // FlowOp), so send the packet through the
1098
                        // FlowOp interface, decoding the packet
1099
                        // lazily.
1100
                        if dec == nil {
354×
1101
                                dec = fastdp.takeDecoder(lock)
177×
1102
                                dec.DecodeLayers(frame)
177×
1103

177×
1104
                                // If we are sending the packet
177×
1105
                                // through the FlowOp interface, we
177×
1106
                                // mustn't create a flow, as that
177×
1107
                                // could prevent the proper handling
177×
1108
                                // of similar packets in the future.
177×
1109
                                createFlow = false
177×
1110
                        }
177×
1111

1112
                        if len(dec.decoded) != 0 {
354×
1113
                                lock.unlock()
177×
1114
                                fop.Process(frame, dec, false)
177×
1115
                        }
177×
1116
                }
1117
        }
1118

1119
        if dec != nil {
1,531×
1120
                // put the decoder back
177×
1121
                lock.relock()
177×
1122
                fastdp.dec = dec
177×
1123
        }
177×
1124

1125
        if len(flow.Actions) != 0 {
2,532×
1126
                lock.relock()
1,178×
1127
                checkWarn(fastdp.dp.Execute(frame, nil, flow.Actions))
1,178×
1128
        }
1,178×
1129

1130
        if createFlow {
2,513×
1131
                lock.relock()
1,159×
1132
                // if the fastdp's deleteFlowsCount changed since we
1,159×
1133
                // initially locked it, then we might have created a
1,159×
1134
                // flow on the basis of stale information.  It's fine
1,159×
1135
                // to handle one packet like that, but it would be bad
1,159×
1136
                // to introduce a stale flow.
1,159×
1137
                if lock.deleteFlowsCount == fastdp.deleteFlowsCount {
2,318×
1138
                        log.Debug("Creating ODP flow ", flow)
1,159×
1139
                        checkWarn(fastdp.dp.CreateFlow(flow))
1,159×
1140
                }
1,159×
1141
        }
1142
}
1143

1144
// Get the EthernetDecoder from the singleton pool
1145
func (fastdp *FastDatapath) takeDecoder(lock *fastDatapathLock) *EthernetDecoder {
177×
1146
        lock.relock()
177×
1147
        dec := fastdp.dec
177×
1148
        if dec == nil {
227×
1149
                dec = NewEthernetDecoder()
50×
1150
        } else {
177×
1151
                fastdp.dec = nil
127×
1152
        }
127×
1153
        return dec
177×
1154
}
1155

1156
type odpActionsFlowOp struct {
1157
        NonDiscardingFlowOp
1158
        fastdp  *FastDatapath
1159
        actions []odp.Action
1160
}
1161

1162
func (fastdp *FastDatapath) odpActions(actions ...odp.Action) FlowOp {
2,255×
1163
        return odpActionsFlowOp{
2,255×
1164
                fastdp:  fastdp,
2,255×
1165
                actions: actions,
2,255×
1166
        }
2,255×
1167
}
2,255×
1168

1169
func (fop odpActionsFlowOp) updateFlowSpec(flow *odp.FlowSpec) {
1,952×
1170
        flow.AddActions(fop.actions)
1,952×
1171
}
1,952×
1172

1173
func (fop odpActionsFlowOp) Process(frame []byte, dec *EthernetDecoder, broadcast bool) {
303×
1174
        fastdp := fop.fastdp
303×
1175
        fastdp.lock.Lock()
303×
1176
        defer fastdp.lock.Unlock()
303×
1177
        checkWarn(fastdp.dp.Execute(frame, nil, fop.actions))
303×
1178
}
303×
1179

1180
// A vetoFlowCreationFlowOp flags that no flow should be created
1181
type vetoFlowCreationFlowOp struct {
1182
        DiscardingFlowOp
1183
}
1184

1185
// A odpFlowKeyFlowOp adds a FlowKey to the resulting flow
1186
type odpFlowKeyFlowOp struct {
1187
        DiscardingFlowOp
1188
        key odp.FlowKey
1189
}
1190

1191
func odpFlowKey(key odp.FlowKey) FlowOp {
1,778×
1192
        return odpFlowKeyFlowOp{key: key}
1,778×
1193
}
1,778×
1194

1195
func (fop odpFlowKeyFlowOp) updateFlowSpec(flow *odp.FlowSpec) {
2,986×
1196
        flow.AddKey(fop.key)
2,986×
1197
}
2,986×
1198

1199
func odpEthernetFlowKey(key PacketKey) FlowOp {
1,293×
1200
        fk := odp.NewEthernetFlowKey()
1,293×
1201
        fk.SetEthSrc(key.SrcMAC)
1,293×
1202
        fk.SetEthDst(key.DstMAC)
1,293×
1203
        return odpFlowKeyFlowOp{key: fk}
1,293×
1204
}
1,293×
Troubleshooting · Open an Issue · Sales · Support · ENTERPRISE · CAREERS · STATUS
BLOG · TWITTER · Legal & Privacy · Supported CI Services · What's a CI service? · Automated Testing

© 2022 Coveralls, Inc