• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

danlapid / oneway-filesync / 4268588603

pending completion
4268588603

Pull #35

github

GitHub
Merge d8e61ff25 into 10e600382
Pull Request #35: Bump golang.org/x/sys from 0.0.0-20200323222414-85ca7c5b95cd to 0.1.0 in /fecbench

761 of 803 relevant lines covered (94.77%)

785.56 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.75
/pkg/udpreceiver/udpreceiver.go
1
package udpreceiver
2

3
import (
4
        "context"
5
        "errors"
6
        "net"
7
        "oneway-filesync/pkg/structs"
8
        "time"
9

10
        "github.com/danlapid/socketbuffer"
11
        "github.com/sirupsen/logrus"
12
)
13

14
type udpReceiverConfig struct {
15
        conn      *net.UDPConn
16
        chunksize int
17
        output    chan *structs.Chunk
18
}
19

20
func manager(ctx context.Context, conf *udpReceiverConfig) {
6✔
21
        ticker := time.NewTicker(200 * time.Millisecond)
6✔
22
        rawconn, err := conf.conn.SyscallConn()
6✔
23
        if err != nil {
7✔
24
                logrus.Errorf("Error getting raw socket: %v", err)
1✔
25
                return
1✔
26
        }
1✔
27
        bufsize, err := socketbuffer.GetReadBuffer(rawconn)
5✔
28
        if err != nil {
5✔
29
                logrus.Errorf("Error getting read buffer size: %v", err)
×
30
                return
×
31
        }
×
32

33
        for {
1,250✔
34
                select {
1,245✔
35
                case <-ctx.Done():
5✔
36
                        return
5✔
37
                case <-ticker.C:
1,240✔
38
                        toread, err := socketbuffer.GetAvailableBytes(rawconn)
1,240✔
39
                        if err != nil {
1,240✔
40
                                logrus.Errorf("Error getting available bytes on socket: %v", err)
×
41
                                continue
×
42
                        }
43

44
                        if float64(toread)/float64(bufsize) > 0.8 {
1,250✔
45
                                logrus.Errorf("Buffers are filling up loss of data is probable")
10✔
46
                        }
10✔
47
                }
48
        }
49
}
50

51
func worker(ctx context.Context, conf *udpReceiverConfig) {
20✔
52
        buf := make([]byte, conf.chunksize)
20✔
53

20✔
54
        for {
43,152✔
55
                select {
43,132✔
56
                case <-ctx.Done():
19✔
57
                        return
19✔
58
                default:
43,113✔
59
                        // conn.Close will interrupt any waiting ReadFromUDP
43,113✔
60
                        n, _, err := conf.conn.ReadFromUDP(buf)
43,113✔
61
                        if err != nil {
85,184✔
62
                                if errors.Is(err, net.ErrClosed) {
84,141✔
63
                                        // conn.Close was called
42,070✔
64
                                        continue
42,070✔
65
                                }
66
                                logrus.Errorf("Error reading from socket: %v", err)
1✔
67
                                return
1✔
68
                        }
69
                        chunk, err := structs.DecodeChunk(buf[:n])
1,042✔
70
                        if err != nil {
1,043✔
71
                                logrus.Errorf("Error decoding chunk: %v", err)
1✔
72
                                continue
1✔
73
                        }
74
                        conf.output <- &chunk
1,041✔
75
                }
76
        }
77
}
78

79
func CreateUdpReceiver(ctx context.Context, ip string, port int, chunksize int, output chan *structs.Chunk, workercount int) {
5✔
80
        addr := net.UDPAddr{
5✔
81
                IP:   net.ParseIP(ip),
5✔
82
                Port: port,
5✔
83
        }
5✔
84

5✔
85
        conn, err := net.ListenUDP("udp", &addr)
5✔
86
        if err != nil {
6✔
87
                logrus.Errorf("Error creating udp socket: %v", err)
1✔
88
                return
1✔
89
        }
1✔
90
        go func() {
8✔
91
                <-ctx.Done()
4✔
92
                conn.Close()
4✔
93
        }()
4✔
94

95
        conf := udpReceiverConfig{
4✔
96
                conn:      conn,
4✔
97
                chunksize: chunksize,
4✔
98
                output:    output,
4✔
99
        }
4✔
100
        for i := 0; i < workercount; i++ {
20✔
101
                go worker(ctx, &conf)
16✔
102
        }
16✔
103
        go manager(ctx, &conf)
4✔
104
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc