• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

foomo / contentserver / 12019324254

25 Nov 2024 09:43PM UTC coverage: 62.018%. First build
12019324254

push

github

web-flow
Merge pull request #31 from foomo/v1.11.x

Release v1.11.x

852 of 1452 new or added lines in 19 files covered. (58.68%)

1125 of 1814 relevant lines covered (62.02%)

12.7 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

72.82
/pkg/handler/socket.go
1
package handler
2

3
import (
4
        "bytes"
5
        "errors"
6
        "fmt"
7
        "io"
8
        "net"
9
        "strconv"
10
        "strings"
11
        "time"
12

13
        "github.com/foomo/contentserver/requests"
14
        "go.uber.org/zap"
15

16
        "github.com/foomo/contentserver/pkg/metrics"
17
        "github.com/foomo/contentserver/pkg/repo"
18
        "github.com/foomo/contentserver/responses"
19
)
20

21
const sourceSocketServer = "socketserver"
22

23
type Socket struct {
24
        l    *zap.Logger
25
        repo *repo.Repo
26
}
27

28
// ------------------------------------------------------------------------------------------------
29
// ~ Constructor
30
// ------------------------------------------------------------------------------------------------
31

32
// NewSocket returns a shiny new socket server
33
func NewSocket(l *zap.Logger, repo *repo.Repo) *Socket {
5✔
34
        inst := &Socket{
5✔
35
                l:    l.Named("socket"),
5✔
36
                repo: repo,
5✔
37
        }
5✔
38

5✔
39
        return inst
5✔
40
}
5✔
41

42
// ------------------------------------------------------------------------------------------------
43
// ~ Public methods
44
// ------------------------------------------------------------------------------------------------
45

46
func (h *Socket) Serve(conn net.Conn) {
125✔
47
        defer func() {
225✔
48
                if r := recover(); r != nil {
100✔
NEW
49
                        if err, ok := r.(error); ok {
×
NEW
50
                                if !errors.Is(err, io.EOF) {
×
NEW
51
                                        h.l.Error("panic in handle connection", zap.Error(err))
×
NEW
52
                                }
×
NEW
53
                        } else {
×
NEW
54
                                h.l.Error("panic in handle connection", zap.String("error", fmt.Sprint(r)))
×
NEW
55
                        }
×
56
                }
57
        }()
58

59
        h.l.Debug("socketServer.handleConnection")
125✔
60
        metrics.NumSocketsGauge.WithLabelValues(conn.RemoteAddr().String()).Inc()
125✔
61

125✔
62
        var (
125✔
63
                headerBuffer [1]byte
125✔
64
                header       = ""
125✔
65
                i            = 0
125✔
66
        )
125✔
67
        for {
308✔
68
                i++
183✔
69
                // fmt.Println("---->", i)
183✔
70
                // let us read with 1 byte steps on conn until we find "{"
183✔
71
                _, readErr := conn.Read(headerBuffer[0:])
183✔
72
                if readErr != nil {
283✔
73
                        h.l.Debug("looks like the client closed the connection", zap.Error(readErr))
100✔
74
                        metrics.NumSocketsGauge.WithLabelValues(conn.RemoteAddr().String()).Dec()
100✔
75
                        return
100✔
76
                }
100✔
77
                // read next byte
78
                current := headerBuffer[0:]
58✔
79
                if string(current) == "{" {
63✔
80
                        // json has started
5✔
81
                        handler, jsonLength, headerErr := h.extractHandlerAndJSONLentgh(header)
5✔
82
                        // reset header
5✔
83
                        header = ""
5✔
84
                        if headerErr != nil {
5✔
NEW
85
                                h.l.Error("invalid request could not read header", zap.Error(headerErr))
×
NEW
86
                                encodedErr, encodingErr := h.encodeReply(responses.NewError(4, "invalid header "+headerErr.Error()))
×
NEW
87
                                if encodingErr == nil {
×
NEW
88
                                        h.writeResponse(conn, encodedErr)
×
NEW
89
                                } else {
×
NEW
90
                                        h.l.Error("could not respond to invalid request", zap.Error(encodingErr))
×
NEW
91
                                }
×
NEW
92
                                return
×
93
                        }
94
                        h.l.Debug("found json", zap.Int("length", jsonLength))
5✔
95
                        if jsonLength > 0 {
10✔
96
                                var (
5✔
97
                                        // let us try to read some json
5✔
98
                                        jsonBytes         = make([]byte, jsonLength)
5✔
99
                                        jsonLengthCurrent = 1
5✔
100
                                        readRound         = 0
5✔
101
                                )
5✔
102

5✔
103
                                // that is "{"
5✔
104
                                jsonBytes[0] = 123
5✔
105

5✔
106
                                for jsonLengthCurrent < jsonLength {
10✔
107
                                        readRound++
5✔
108
                                        readLength, jsonReadErr := conn.Read(jsonBytes[jsonLengthCurrent:jsonLength])
5✔
109
                                        if jsonReadErr != nil {
5✔
NEW
110
                                                // @fixme we need to force a read timeout (SetReadDeadline?), if expected jsonLength is lower than really sent bytes (e.g. if client implements protocol wrong)
×
NEW
111
                                                // @todo should we check for io.EOF here
×
NEW
112
                                                h.l.Error("could not read json - giving up with this client connection", zap.Error(jsonReadErr))
×
NEW
113
                                                metrics.NumSocketsGauge.WithLabelValues(conn.RemoteAddr().String()).Dec()
×
NEW
114
                                                return
×
NEW
115
                                        }
×
116
                                        jsonLengthCurrent += readLength
5✔
117
                                        h.l.Debug("read cycle status",
5✔
118
                                                zap.Int("jsonLengthCurrent", jsonLengthCurrent),
5✔
119
                                                zap.Int("jsonLength", jsonLength),
5✔
120
                                                zap.Int("readRound", readRound),
5✔
121
                                        )
5✔
122
                                }
123

124
                                h.l.Debug("read json", zap.Int("length", len(jsonBytes)))
5✔
125

5✔
126
                                h.writeResponse(conn, h.execute(handler, jsonBytes))
5✔
127
                                // note: connection remains open
5✔
128
                                continue
5✔
129
                        }
NEW
130
                        h.l.Error("can not read empty json")
×
NEW
131
                        metrics.NumSocketsGauge.WithLabelValues(conn.RemoteAddr().String()).Dec()
×
NEW
132
                        return
×
133
                }
134
                // adding to header byte by byte
135
                header += string(headerBuffer[0:])
53✔
136
        }
137
}
138

139
// ------------------------------------------------------------------------------------------------
140
// ~ Private methods
141
// ------------------------------------------------------------------------------------------------
142

143
func (h *Socket) extractHandlerAndJSONLentgh(header string) (route Route, jsonLength int, err error) {
5✔
144
        headerParts := strings.Split(header, ":")
5✔
145
        if len(headerParts) != 2 {
5✔
NEW
146
                return "", 0, errors.New("invalid header")
×
NEW
147
        }
×
148
        jsonLength, err = strconv.Atoi(headerParts[1])
5✔
149
        if err != nil {
5✔
NEW
150
                err = fmt.Errorf("could not parse length in header: %q", header)
×
NEW
151
        }
×
152
        return Route(headerParts[0]), jsonLength, err
5✔
153
}
154

155
func (h *Socket) execute(route Route, jsonBytes []byte) (reply []byte) {
5✔
156
        h.l.Debug("incoming json buffer", zap.Int("length", len(jsonBytes)))
5✔
157

5✔
158
        if route == RouteGetRepo {
6✔
159
                var (
1✔
160
                        b bytes.Buffer
1✔
161
                )
1✔
162
                h.repo.WriteRepoBytes(&b)
1✔
163
                return b.Bytes()
1✔
164
        }
1✔
165

166
        reply, handlingError := h.handleRequest(h.repo, route, jsonBytes, sourceSocketServer)
4✔
167
        if handlingError != nil {
4✔
NEW
168
                h.l.Error("socketServer.execute failed", zap.Error(handlingError))
×
NEW
169
        }
×
170
        return reply
4✔
171
}
172

173
func (h *Socket) writeResponse(conn net.Conn, reply []byte) {
5✔
174
        headerBytes := []byte(strconv.Itoa(len(reply)))
5✔
175
        reply = append(headerBytes, reply...)
5✔
176
        h.l.Debug("replying", zap.String("reply", string(reply)))
5✔
177
        n, writeError := conn.Write(reply)
5✔
178
        if writeError != nil {
5✔
NEW
179
                h.l.Error("socketServer.writeResponse: could not write reply", zap.Error(writeError))
×
NEW
180
                return
×
NEW
181
        }
×
182
        if n < len(reply) {
5✔
NEW
183
                h.l.Error("socketServer.writeResponse: write too short",
×
NEW
184
                        zap.Int("got", n),
×
NEW
185
                        zap.Int("expected", len(reply)),
×
NEW
186
                )
×
NEW
187
                return
×
NEW
188
        }
×
189
        h.l.Debug("replied. waiting for next request on open connection")
5✔
190
}
191

192
func (h *Socket) handleRequest(r *repo.Repo, route Route, jsonBytes []byte, source string) ([]byte, error) {
4✔
193
        start := time.Now()
4✔
194

4✔
195
        reply, err := h.executeRequest(r, route, jsonBytes, source)
4✔
196
        result := "success"
4✔
197
        if err != nil {
4✔
NEW
198
                result = "error"
×
NEW
199
        }
×
200

201
        metrics.ServiceRequestCounter.WithLabelValues(string(route), result, source).Inc()
4✔
202
        metrics.ServiceRequestDuration.WithLabelValues(string(route), result, source).Observe(time.Since(start).Seconds())
4✔
203

4✔
204
        return reply, err
4✔
205
}
206

207
func (h *Socket) executeRequest(r *repo.Repo, route Route, jsonBytes []byte, source string) (replyBytes []byte, err error) {
4✔
208
        var (
4✔
209
                reply             interface{}
4✔
210
                apiErr            error
4✔
211
                jsonErr           error
4✔
212
                processIfJSONIsOk = func(err error, processingFunc func()) {
8✔
213
                        if err != nil {
4✔
NEW
214
                                jsonErr = err
×
NEW
215
                                return
×
NEW
216
                        }
×
217
                        processingFunc()
4✔
218
                }
219
        )
220
        metrics.ContentRequestCounter.WithLabelValues(source).Inc()
4✔
221

4✔
222
        // handle and process
4✔
223
        switch route {
4✔
224
        // case RouteGetRepo: // This case is handled prior to handleRequest being called.
225
        // since the resulting bytes are written directly in to the http.ResponseWriter / net.Connection
226
        case RouteGetURIs:
1✔
227
                getURIRequest := &requests.URIs{}
1✔
228
                processIfJSONIsOk(json.Unmarshal(jsonBytes, &getURIRequest), func() {
2✔
229
                        reply = r.GetURIs(getURIRequest.Dimension, getURIRequest.IDs)
1✔
230
                })
1✔
231
        case RouteGetContent:
1✔
232
                contentRequest := &requests.Content{}
1✔
233
                processIfJSONIsOk(json.Unmarshal(jsonBytes, &contentRequest), func() {
2✔
234
                        reply, apiErr = r.GetContent(contentRequest)
1✔
235
                })
1✔
236
        case RouteGetNodes:
1✔
237
                nodesRequest := &requests.Nodes{}
1✔
238
                processIfJSONIsOk(json.Unmarshal(jsonBytes, &nodesRequest), func() {
2✔
239
                        reply = r.GetNodes(nodesRequest)
1✔
240
                })
1✔
241
        case RouteUpdate:
1✔
242
                updateRequest := &requests.Update{}
1✔
243
                processIfJSONIsOk(json.Unmarshal(jsonBytes, &updateRequest), func() {
2✔
244
                        reply = r.Update()
1✔
245
                })
1✔
246

NEW
247
        default:
×
NEW
248
                reply = responses.NewError(1, "unknown handler: "+string(route))
×
249
        }
250

251
        // error handling
252
        if jsonErr != nil {
4✔
NEW
253
                h.l.Error("could not read incoming json", zap.Error(jsonErr))
×
NEW
254
                reply = responses.NewError(2, "could not read incoming json "+jsonErr.Error())
×
255
        } else if apiErr != nil {
4✔
NEW
256
                h.l.Error("an API error occurred", zap.Error(apiErr))
×
NEW
257
                reply = responses.NewError(3, "internal error "+apiErr.Error())
×
NEW
258
        }
×
259

260
        return h.encodeReply(reply)
4✔
261
}
262

263
// encodeReply takes an interface and encodes it as JSON
264
// it returns the resulting JSON and a marshalling error
265
func (h *Socket) encodeReply(reply interface{}) (replyBytes []byte, err error) {
4✔
266
        replyBytes, err = json.Marshal(map[string]interface{}{
4✔
267
                "reply": reply,
4✔
268
        })
4✔
269
        if err != nil {
4✔
NEW
270
                h.l.Error("could not encode reply", zap.Error(err))
×
NEW
271
        }
×
272
        return
4✔
273
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc