• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

foomo / contentserver / 19674387362

25 Nov 2025 03:12PM UTC coverage: 41.746% (+0.08%) from 41.662%
19674387362

Pull #67

github

web-flow
Merge cabb48f16 into e7e5d09f5
Pull Request #67: Add Multi-Cloud Blob Storage Support (AWS S3, Azure, GCS)

166 of 366 new or added lines in 11 files covered. (45.36%)

3 existing lines in 2 files now uncovered.

875 of 2096 relevant lines covered (41.75%)

22387.85 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/pkg/handler/socket.go
1
package handler
2

3
import (
4
        "bytes"
5
        "context"
6
        "errors"
7
        "fmt"
8
        "io"
9
        "net"
10
        "strconv"
11
        "strings"
12
        "time"
13

14
        "github.com/foomo/contentserver/requests"
15
        "go.uber.org/zap"
16

17
        "github.com/foomo/contentserver/pkg/metrics"
18
        "github.com/foomo/contentserver/pkg/repo"
19
        "github.com/foomo/contentserver/responses"
20
)
21

22
const sourceSocketServer = "socketserver"
23

24
type Socket struct {
25
        l    *zap.Logger
26
        repo *repo.Repo
27
}
28

29
// ------------------------------------------------------------------------------------------------
30
// ~ Constructor
31
// ------------------------------------------------------------------------------------------------
32

33
// NewSocket returns a shiny new socket server
34
func NewSocket(l *zap.Logger, repo *repo.Repo) *Socket {
×
35
        inst := &Socket{
×
36
                l:    l.Named("socket"),
×
37
                repo: repo,
×
38
        }
×
39

×
40
        return inst
×
41
}
×
42

43
// ------------------------------------------------------------------------------------------------
44
// ~ Public methods
45
// ------------------------------------------------------------------------------------------------
46

47
func (h *Socket) Serve(conn net.Conn) {
×
48
        defer func() {
×
49
                if r := recover(); r != nil {
×
50
                        if err, ok := r.(error); ok {
×
51
                                if !errors.Is(err, io.EOF) {
×
52
                                        h.l.Error("panic in handle connection", zap.Error(err))
×
53
                                }
×
54
                        } else {
×
55
                                h.l.Error("panic in handle connection", zap.String("error", fmt.Sprint(r)))
×
56
                        }
×
57
                }
58
        }()
59

60
        // h.l.Debug("socketServer.handleConnection")
61
        metrics.NumSocketsGauge.WithLabelValues(conn.RemoteAddr().String()).Inc()
×
62

×
63
        var (
×
64
                headerBuffer [1]byte
×
65
                header       = ""
×
66
        )
×
67
        for {
×
68
                // let us read with 1 byte steps on conn until we find "{"
×
69
                _, readErr := conn.Read(headerBuffer[0:])
×
70
                if errors.Is(readErr, io.EOF) {
×
71
                        // client closed the connection
×
72
                        metrics.NumSocketsGauge.WithLabelValues(conn.RemoteAddr().String()).Dec()
×
73
                        return
×
74
                } else if readErr != nil {
×
75
                        h.l.Error("failed to read from connection", zap.Error(readErr))
×
76
                        return
×
77
                }
×
78
                // read next byte
79
                current := headerBuffer[0:]
×
80
                if string(current) == "{" {
×
81
                        // json has started
×
82
                        handler, jsonLength, headerErr := h.extractHandlerAndJSONLentgh(header)
×
83
                        // reset header
×
84
                        header = ""
×
85
                        if headerErr != nil {
×
86
                                h.l.Error("invalid request could not read header", zap.Error(headerErr))
×
87
                                encodedErr, encodingErr := h.encodeReply(responses.NewError(4, "invalid header "+headerErr.Error()))
×
88
                                if encodingErr == nil {
×
89
                                        h.writeResponse(conn, encodedErr)
×
90
                                } else {
×
91
                                        h.l.Error("could not respond to invalid request", zap.Error(encodingErr))
×
92
                                }
×
93
                                return
×
94
                        }
95
                        h.l.Debug("found json", zap.Int("length", jsonLength))
×
96
                        if jsonLength > 0 {
×
97
                                var (
×
98
                                        // let us try to read some json
×
99
                                        jsonBytes         = make([]byte, jsonLength)
×
100
                                        jsonLengthCurrent = 1
×
101
                                        readRound         = 0
×
102
                                )
×
103

×
104
                                // that is "{"
×
105
                                jsonBytes[0] = 123
×
106

×
107
                                for jsonLengthCurrent < jsonLength {
×
108
                                        readRound++
×
109
                                        readLength, jsonReadErr := conn.Read(jsonBytes[jsonLengthCurrent:jsonLength])
×
110
                                        if jsonReadErr != nil {
×
111
                                                // @fixme we need to force a read timeout (SetReadDeadline?), if expected jsonLength is lower than really sent bytes (e.g. if client implements protocol wrong)
×
112
                                                // @todo should we check for io.EOF here
×
113
                                                h.l.Error("could not read json - giving up with this client connection", zap.Error(jsonReadErr))
×
114
                                                metrics.NumSocketsGauge.WithLabelValues(conn.RemoteAddr().String()).Dec()
×
115
                                                return
×
116
                                        }
×
117
                                        jsonLengthCurrent += readLength
×
118
                                        h.l.Debug("read cycle status",
×
119
                                                zap.Int("jsonLengthCurrent", jsonLengthCurrent),
×
120
                                                zap.Int("jsonLength", jsonLength),
×
121
                                                zap.Int("readRound", readRound),
×
122
                                        )
×
123
                                }
124

125
                                h.l.Debug("read json", zap.Int("length", len(jsonBytes)))
×
126

×
127
                                h.writeResponse(conn, h.execute(handler, jsonBytes))
×
128
                                // note: connection remains open
×
129
                                continue
×
130
                        }
131
                        h.l.Error("can not read empty json")
×
132
                        metrics.NumSocketsGauge.WithLabelValues(conn.RemoteAddr().String()).Dec()
×
133
                        return
×
134
                }
135
                // adding to header byte by byte
136
                header += string(headerBuffer[0:])
×
137
        }
138
}
139

140
// ------------------------------------------------------------------------------------------------
141
// ~ Private methods
142
// ------------------------------------------------------------------------------------------------
143

144
func (h *Socket) extractHandlerAndJSONLentgh(header string) (route Route, jsonLength int, err error) {
×
145
        headerParts := strings.Split(header, ":")
×
146
        if len(headerParts) != 2 {
×
147
                return "", 0, errors.New("invalid header")
×
148
        }
×
149
        jsonLength, err = strconv.Atoi(headerParts[1])
×
150
        if err != nil {
×
151
                err = fmt.Errorf("could not parse length in header: %q", header)
×
152
        }
×
153
        return Route(headerParts[0]), jsonLength, err
×
154
}
155

156
func (h *Socket) execute(route Route, jsonBytes []byte) (reply []byte) {
×
157
        h.l.Debug("incoming json buffer", zap.Int("length", len(jsonBytes)))
×
158

×
159
        if route == RouteGetRepo {
×
NEW
160
                var b bytes.Buffer
×
NEW
161
                if err := h.repo.WriteRepoBytes(context.Background(), &b); err != nil {
×
NEW
162
                        h.l.Error("failed to write repo bytes", zap.Error(err))
×
NEW
163
                        errorReply, _ := h.encodeReply(responses.NewError(5, "failed to get repo: "+err.Error()))
×
NEW
164
                        return errorReply
×
NEW
165
                }
×
166
                return b.Bytes()
×
167
        }
168

169
        reply, handlingError := h.handleRequest(h.repo, route, jsonBytes, sourceSocketServer)
×
170
        if handlingError != nil {
×
171
                h.l.Error("socketServer.execute failed", zap.Error(handlingError))
×
172
        }
×
173
        return reply
×
174
}
175

176
func (h *Socket) writeResponse(conn net.Conn, reply []byte) {
×
177
        headerBytes := []byte(strconv.Itoa(len(reply)))
×
178
        reply = append(headerBytes, reply...)
×
179
        h.l.Debug("replying", zap.String("reply", string(reply)))
×
180
        n, writeError := conn.Write(reply)
×
181
        if writeError != nil {
×
182
                h.l.Error("socketServer.writeResponse: could not write reply", zap.Error(writeError))
×
183
                return
×
184
        }
×
185
        if n < len(reply) {
×
186
                h.l.Error("socketServer.writeResponse: write too short",
×
187
                        zap.Int("got", n),
×
188
                        zap.Int("expected", len(reply)),
×
189
                )
×
190
                return
×
191
        }
×
192
        h.l.Debug("replied. waiting for next request on open connection")
×
193
}
194

195
func (h *Socket) handleRequest(r *repo.Repo, route Route, jsonBytes []byte, source string) ([]byte, error) {
×
196
        start := time.Now()
×
197

×
198
        reply, err := h.executeRequest(r, route, jsonBytes, source)
×
199
        result := "success"
×
200
        if err != nil {
×
201
                result = "error"
×
202
        }
×
203

204
        metrics.ServiceRequestCounter.WithLabelValues(string(route), result, source).Inc()
×
205
        metrics.ServiceRequestDuration.WithLabelValues(string(route), result, source).Observe(time.Since(start).Seconds())
×
206

×
207
        return reply, err
×
208
}
209

210
func (h *Socket) executeRequest(r *repo.Repo, route Route, jsonBytes []byte, source string) (replyBytes []byte, err error) {
×
211
        var (
×
212
                reply             interface{}
×
213
                apiErr            error
×
214
                jsonErr           error
×
215
                processIfJSONIsOk = func(err error, processingFunc func()) {
×
216
                        if err != nil {
×
217
                                jsonErr = err
×
218
                                return
×
219
                        }
×
220
                        processingFunc()
×
221
                }
222
        )
223
        metrics.ContentRequestCounter.WithLabelValues(source).Inc()
×
224

×
225
        // handle and process
×
226
        switch route {
×
227
        // case RouteGetRepo: // This case is handled prior to handleRequest being called.
228
        // since the resulting bytes are written directly in to the http.ResponseWriter / net.Connection
229
        case RouteGetURIs:
×
230
                getURIRequest := &requests.URIs{}
×
231
                processIfJSONIsOk(json.Unmarshal(jsonBytes, &getURIRequest), func() {
×
232
                        reply = r.GetURIs(getURIRequest.Dimension, getURIRequest.IDs)
×
233
                })
×
234
        case RouteGetContent:
×
235
                contentRequest := &requests.Content{}
×
236
                processIfJSONIsOk(json.Unmarshal(jsonBytes, &contentRequest), func() {
×
237
                        reply, apiErr = r.GetContent(contentRequest)
×
238
                })
×
239
        case RouteGetNodes:
×
240
                nodesRequest := &requests.Nodes{}
×
241
                processIfJSONIsOk(json.Unmarshal(jsonBytes, &nodesRequest), func() {
×
242
                        reply = r.GetNodes(nodesRequest)
×
243
                })
×
244
        case RouteUpdate:
×
245
                updateRequest := &requests.Update{}
×
246
                processIfJSONIsOk(json.Unmarshal(jsonBytes, &updateRequest), func() {
×
NEW
247
                        reply = r.Update(context.Background())
×
248
                })
×
249

250
        default:
×
251
                reply = responses.NewError(1, "unknown handler: "+string(route))
×
252
        }
253

254
        // error handling
255
        if jsonErr != nil {
×
256
                h.l.Error("could not read incoming json", zap.Error(jsonErr))
×
257
                reply = responses.NewError(2, "could not read incoming json "+jsonErr.Error())
×
258
        } else if apiErr != nil {
×
259
                h.l.Error("an API error occurred", zap.Error(apiErr))
×
260
                reply = responses.NewError(3, "internal error "+apiErr.Error())
×
261
        }
×
262

263
        return h.encodeReply(reply)
×
264
}
265

266
// encodeReply takes an interface and encodes it as JSON
267
// it returns the resulting JSON and a marshalling error
268
func (h *Socket) encodeReply(reply interface{}) (replyBytes []byte, err error) {
×
269
        replyBytes, err = json.Marshal(map[string]interface{}{
×
270
                "reply": reply,
×
271
        })
×
272
        if err != nil {
×
273
                h.l.Error("could not encode reply", zap.Error(err))
×
274
        }
×
275
        return
×
276
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc