• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

foomo / contentserver / 15108131092

19 May 2025 08:29AM UTC coverage: 41.461% (-20.9%) from 62.396%
15108131092

push

github

web-flow
Merge pull request #49 from foomo/feature/build-with-safe-tag

feat: build with safe tag

15 of 43 new or added lines in 5 files covered. (34.88%)

376 existing lines in 11 files now uncovered.

755 of 1821 relevant lines covered (41.46%)

9.24 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/pkg/handler/socket.go
1
package handler
2

3
import (
4
        "bytes"
5
        "errors"
6
        "fmt"
7
        "io"
8
        "net"
9
        "strconv"
10
        "strings"
11
        "time"
12

13
        "github.com/foomo/contentserver/requests"
14
        "go.uber.org/zap"
15

16
        "github.com/foomo/contentserver/pkg/metrics"
17
        "github.com/foomo/contentserver/pkg/repo"
18
        "github.com/foomo/contentserver/responses"
19
)
20

21
const sourceSocketServer = "socketserver"
22

23
type Socket struct {
24
        l    *zap.Logger
25
        repo *repo.Repo
26
}
27

28
// ------------------------------------------------------------------------------------------------
29
// ~ Constructor
30
// ------------------------------------------------------------------------------------------------
31

32
// NewSocket returns a shiny new socket server
UNCOV
33
func NewSocket(l *zap.Logger, repo *repo.Repo) *Socket {
×
UNCOV
34
        inst := &Socket{
×
UNCOV
35
                l:    l.Named("socket"),
×
UNCOV
36
                repo: repo,
×
UNCOV
37
        }
×
UNCOV
38

×
UNCOV
39
        return inst
×
UNCOV
40
}
×
41

42
// ------------------------------------------------------------------------------------------------
43
// ~ Public methods
44
// ------------------------------------------------------------------------------------------------
45

UNCOV
46
func (h *Socket) Serve(conn net.Conn) {
×
UNCOV
47
        defer func() {
×
UNCOV
48
                if r := recover(); r != nil {
×
49
                        if err, ok := r.(error); ok {
×
50
                                if !errors.Is(err, io.EOF) {
×
51
                                        h.l.Error("panic in handle connection", zap.Error(err))
×
52
                                }
×
53
                        } else {
×
54
                                h.l.Error("panic in handle connection", zap.String("error", fmt.Sprint(r)))
×
55
                        }
×
56
                }
57
        }()
58

59
        // h.l.Debug("socketServer.handleConnection")
UNCOV
60
        metrics.NumSocketsGauge.WithLabelValues(conn.RemoteAddr().String()).Inc()
×
UNCOV
61

×
UNCOV
62
        var (
×
UNCOV
63
                headerBuffer [1]byte
×
UNCOV
64
                header       = ""
×
UNCOV
65
        )
×
UNCOV
66
        for {
×
UNCOV
67
                // let us read with 1 byte steps on conn until we find "{"
×
UNCOV
68
                _, readErr := conn.Read(headerBuffer[0:])
×
NEW
69
                if errors.Is(readErr, io.EOF) {
×
NEW
70
                        // client closed the connection
×
UNCOV
71
                        metrics.NumSocketsGauge.WithLabelValues(conn.RemoteAddr().String()).Dec()
×
UNCOV
72
                        return
×
NEW
73
                } else if readErr != nil {
×
NEW
74
                        h.l.Error("failed to read from connection", zap.Error(readErr))
×
NEW
75
                        return
×
UNCOV
76
                }
×
77
                // read next byte
UNCOV
78
                current := headerBuffer[0:]
×
UNCOV
79
                if string(current) == "{" {
×
UNCOV
80
                        // json has started
×
UNCOV
81
                        handler, jsonLength, headerErr := h.extractHandlerAndJSONLentgh(header)
×
UNCOV
82
                        // reset header
×
UNCOV
83
                        header = ""
×
UNCOV
84
                        if headerErr != nil {
×
85
                                h.l.Error("invalid request could not read header", zap.Error(headerErr))
×
86
                                encodedErr, encodingErr := h.encodeReply(responses.NewError(4, "invalid header "+headerErr.Error()))
×
87
                                if encodingErr == nil {
×
88
                                        h.writeResponse(conn, encodedErr)
×
89
                                } else {
×
90
                                        h.l.Error("could not respond to invalid request", zap.Error(encodingErr))
×
91
                                }
×
92
                                return
×
93
                        }
UNCOV
94
                        h.l.Debug("found json", zap.Int("length", jsonLength))
×
UNCOV
95
                        if jsonLength > 0 {
×
UNCOV
96
                                var (
×
UNCOV
97
                                        // let us try to read some json
×
UNCOV
98
                                        jsonBytes         = make([]byte, jsonLength)
×
UNCOV
99
                                        jsonLengthCurrent = 1
×
UNCOV
100
                                        readRound         = 0
×
UNCOV
101
                                )
×
UNCOV
102

×
UNCOV
103
                                // that is "{"
×
UNCOV
104
                                jsonBytes[0] = 123
×
UNCOV
105

×
UNCOV
106
                                for jsonLengthCurrent < jsonLength {
×
UNCOV
107
                                        readRound++
×
UNCOV
108
                                        readLength, jsonReadErr := conn.Read(jsonBytes[jsonLengthCurrent:jsonLength])
×
UNCOV
109
                                        if jsonReadErr != nil {
×
110
                                                // @fixme we need to force a read timeout (SetReadDeadline?), if expected jsonLength is lower than really sent bytes (e.g. if client implements protocol wrong)
×
111
                                                // @todo should we check for io.EOF here
×
112
                                                h.l.Error("could not read json - giving up with this client connection", zap.Error(jsonReadErr))
×
113
                                                metrics.NumSocketsGauge.WithLabelValues(conn.RemoteAddr().String()).Dec()
×
114
                                                return
×
115
                                        }
×
UNCOV
116
                                        jsonLengthCurrent += readLength
×
UNCOV
117
                                        h.l.Debug("read cycle status",
×
UNCOV
118
                                                zap.Int("jsonLengthCurrent", jsonLengthCurrent),
×
UNCOV
119
                                                zap.Int("jsonLength", jsonLength),
×
UNCOV
120
                                                zap.Int("readRound", readRound),
×
UNCOV
121
                                        )
×
122
                                }
123

UNCOV
124
                                h.l.Debug("read json", zap.Int("length", len(jsonBytes)))
×
UNCOV
125

×
UNCOV
126
                                h.writeResponse(conn, h.execute(handler, jsonBytes))
×
UNCOV
127
                                // note: connection remains open
×
UNCOV
128
                                continue
×
129
                        }
130
                        h.l.Error("can not read empty json")
×
131
                        metrics.NumSocketsGauge.WithLabelValues(conn.RemoteAddr().String()).Dec()
×
132
                        return
×
133
                }
134
                // adding to header byte by byte
UNCOV
135
                header += string(headerBuffer[0:])
×
136
        }
137
}
138

139
// ------------------------------------------------------------------------------------------------
140
// ~ Private methods
141
// ------------------------------------------------------------------------------------------------
142

UNCOV
143
func (h *Socket) extractHandlerAndJSONLentgh(header string) (route Route, jsonLength int, err error) {
×
UNCOV
144
        headerParts := strings.Split(header, ":")
×
UNCOV
145
        if len(headerParts) != 2 {
×
146
                return "", 0, errors.New("invalid header")
×
147
        }
×
UNCOV
148
        jsonLength, err = strconv.Atoi(headerParts[1])
×
UNCOV
149
        if err != nil {
×
150
                err = fmt.Errorf("could not parse length in header: %q", header)
×
151
        }
×
UNCOV
152
        return Route(headerParts[0]), jsonLength, err
×
153
}
154

UNCOV
155
func (h *Socket) execute(route Route, jsonBytes []byte) (reply []byte) {
×
UNCOV
156
        h.l.Debug("incoming json buffer", zap.Int("length", len(jsonBytes)))
×
UNCOV
157

×
UNCOV
158
        if route == RouteGetRepo {
×
UNCOV
159
                var (
×
UNCOV
160
                        b bytes.Buffer
×
UNCOV
161
                )
×
UNCOV
162
                h.repo.WriteRepoBytes(&b)
×
UNCOV
163
                return b.Bytes()
×
UNCOV
164
        }
×
165

UNCOV
166
        reply, handlingError := h.handleRequest(h.repo, route, jsonBytes, sourceSocketServer)
×
UNCOV
167
        if handlingError != nil {
×
168
                h.l.Error("socketServer.execute failed", zap.Error(handlingError))
×
169
        }
×
UNCOV
170
        return reply
×
171
}
172

UNCOV
173
func (h *Socket) writeResponse(conn net.Conn, reply []byte) {
×
UNCOV
174
        headerBytes := []byte(strconv.Itoa(len(reply)))
×
UNCOV
175
        reply = append(headerBytes, reply...)
×
UNCOV
176
        h.l.Debug("replying", zap.String("reply", string(reply)))
×
UNCOV
177
        n, writeError := conn.Write(reply)
×
UNCOV
178
        if writeError != nil {
×
179
                h.l.Error("socketServer.writeResponse: could not write reply", zap.Error(writeError))
×
180
                return
×
181
        }
×
UNCOV
182
        if n < len(reply) {
×
183
                h.l.Error("socketServer.writeResponse: write too short",
×
184
                        zap.Int("got", n),
×
185
                        zap.Int("expected", len(reply)),
×
186
                )
×
187
                return
×
188
        }
×
UNCOV
189
        h.l.Debug("replied. waiting for next request on open connection")
×
190
}
191

UNCOV
192
func (h *Socket) handleRequest(r *repo.Repo, route Route, jsonBytes []byte, source string) ([]byte, error) {
×
UNCOV
193
        start := time.Now()
×
UNCOV
194

×
UNCOV
195
        reply, err := h.executeRequest(r, route, jsonBytes, source)
×
UNCOV
196
        result := "success"
×
UNCOV
197
        if err != nil {
×
198
                result = "error"
×
199
        }
×
200

UNCOV
201
        metrics.ServiceRequestCounter.WithLabelValues(string(route), result, source).Inc()
×
UNCOV
202
        metrics.ServiceRequestDuration.WithLabelValues(string(route), result, source).Observe(time.Since(start).Seconds())
×
UNCOV
203

×
UNCOV
204
        return reply, err
×
205
}
206

UNCOV
207
func (h *Socket) executeRequest(r *repo.Repo, route Route, jsonBytes []byte, source string) (replyBytes []byte, err error) {
×
UNCOV
208
        var (
×
UNCOV
209
                reply             interface{}
×
UNCOV
210
                apiErr            error
×
UNCOV
211
                jsonErr           error
×
UNCOV
212
                processIfJSONIsOk = func(err error, processingFunc func()) {
×
UNCOV
213
                        if err != nil {
×
214
                                jsonErr = err
×
215
                                return
×
216
                        }
×
UNCOV
217
                        processingFunc()
×
218
                }
219
        )
UNCOV
220
        metrics.ContentRequestCounter.WithLabelValues(source).Inc()
×
UNCOV
221

×
UNCOV
222
        // handle and process
×
UNCOV
223
        switch route {
×
224
        // case RouteGetRepo: // This case is handled prior to handleRequest being called.
225
        // since the resulting bytes are written directly in to the http.ResponseWriter / net.Connection
UNCOV
226
        case RouteGetURIs:
×
UNCOV
227
                getURIRequest := &requests.URIs{}
×
UNCOV
228
                processIfJSONIsOk(json.Unmarshal(jsonBytes, &getURIRequest), func() {
×
UNCOV
229
                        reply = r.GetURIs(getURIRequest.Dimension, getURIRequest.IDs)
×
UNCOV
230
                })
×
UNCOV
231
        case RouteGetContent:
×
UNCOV
232
                contentRequest := &requests.Content{}
×
UNCOV
233
                processIfJSONIsOk(json.Unmarshal(jsonBytes, &contentRequest), func() {
×
UNCOV
234
                        reply, apiErr = r.GetContent(contentRequest)
×
UNCOV
235
                })
×
UNCOV
236
        case RouteGetNodes:
×
UNCOV
237
                nodesRequest := &requests.Nodes{}
×
UNCOV
238
                processIfJSONIsOk(json.Unmarshal(jsonBytes, &nodesRequest), func() {
×
UNCOV
239
                        reply = r.GetNodes(nodesRequest)
×
UNCOV
240
                })
×
UNCOV
241
        case RouteUpdate:
×
UNCOV
242
                updateRequest := &requests.Update{}
×
UNCOV
243
                processIfJSONIsOk(json.Unmarshal(jsonBytes, &updateRequest), func() {
×
UNCOV
244
                        reply = r.Update()
×
UNCOV
245
                })
×
246

247
        default:
×
248
                reply = responses.NewError(1, "unknown handler: "+string(route))
×
249
        }
250

251
        // error handling
UNCOV
252
        if jsonErr != nil {
×
253
                h.l.Error("could not read incoming json", zap.Error(jsonErr))
×
254
                reply = responses.NewError(2, "could not read incoming json "+jsonErr.Error())
×
UNCOV
255
        } else if apiErr != nil {
×
256
                h.l.Error("an API error occurred", zap.Error(apiErr))
×
257
                reply = responses.NewError(3, "internal error "+apiErr.Error())
×
258
        }
×
259

UNCOV
260
        return h.encodeReply(reply)
×
261
}
262

263
// encodeReply takes an interface and encodes it as JSON
264
// it returns the resulting JSON and a marshalling error
UNCOV
265
func (h *Socket) encodeReply(reply interface{}) (replyBytes []byte, err error) {
×
UNCOV
266
        replyBytes, err = json.Marshal(map[string]interface{}{
×
UNCOV
267
                "reply": reply,
×
UNCOV
268
        })
×
UNCOV
269
        if err != nil {
×
270
                h.l.Error("could not encode reply", zap.Error(err))
×
271
        }
×
UNCOV
272
        return
×
273
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc