• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

99designs / gqlgen / 12989129605

27 Jan 2025 12:36PM UTC coverage: 73.926% (+0.009%) from 73.917%
12989129605

Pull #3501

github

web-flow
chore(deps-dev): bump vite from 6.0.9 to 6.0.11 in /integration

Bumps [vite](https://github.com/vitejs/vite/tree/HEAD/packages/vite) from 6.0.9 to 6.0.11.
- [Release notes](https://github.com/vitejs/vite/releases)
- [Changelog](https://github.com/vitejs/vite/blob/main/packages/vite/CHANGELOG.md)
- [Commits](https://github.com/vitejs/vite/commits/v6.0.11/packages/vite)

---
updated-dependencies:
- dependency-name: vite
  dependency-type: direct:development
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
Pull Request #3501: chore(deps-dev): bump vite from 6.0.9 to 6.0.11 in /integration

8619 of 11659 relevant lines covered (73.93%)

531.68 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.2
/graphql/handler/transport/websocket.go
1
package transport
2

3
import (
4
        "bytes"
5
        "context"
6
        "encoding/json"
7
        "errors"
8
        "fmt"
9
        "log"
10
        "net"
11
        "net/http"
12
        "sync"
13
        "time"
14

15
        "github.com/gorilla/websocket"
16
        "github.com/vektah/gqlparser/v2/gqlerror"
17

18
        "github.com/99designs/gqlgen/graphql"
19
        "github.com/99designs/gqlgen/graphql/errcode"
20
)
21

22
type (
23
        Websocket struct {
24
                Upgrader              websocket.Upgrader
25
                InitFunc              WebsocketInitFunc
26
                InitTimeout           time.Duration
27
                ErrorFunc             WebsocketErrorFunc
28
                CloseFunc             WebsocketCloseFunc
29
                KeepAlivePingInterval time.Duration
30
                PongOnlyInterval      time.Duration
31
                PingPongInterval      time.Duration
32
                /* If PingPongInterval has a non-0 duration, then when the server sends a ping
33
                 * it sets a ReadDeadline of PingPongInterval*2 and if the client doesn't respond
34
                 * with pong before that deadline is reached then the connection will die with a
35
                 * 1006 error code.
36
                 *
37
                 * MissingPongOk if true, tells the server to not use a ReadDeadline such that a
38
                 * missing/slow pong response from the client doesn't kill the connection.
39
                 */
40
                MissingPongOk bool
41

42
                didInjectSubprotocols bool
43
        }
44
        wsConnection struct {
45
                Websocket
46
                ctx             context.Context
47
                conn            *websocket.Conn
48
                me              messageExchanger
49
                active          map[string]context.CancelFunc
50
                mu              sync.Mutex
51
                keepAliveTicker *time.Ticker
52
                pongOnlyTicker  *time.Ticker
53
                pingPongTicker  *time.Ticker
54
                receivedPong    bool
55
                exec            graphql.GraphExecutor
56
                closed          bool
57
                headers         http.Header
58

59
                initPayload InitPayload
60
        }
61

62
        WebsocketInitFunc  func(ctx context.Context, initPayload InitPayload) (context.Context, *InitPayload, error)
63
        WebsocketErrorFunc func(ctx context.Context, err error)
64

65
        // Callback called when websocket is closed.
66
        WebsocketCloseFunc func(ctx context.Context, closeCode int)
67
)
68

69
var errReadTimeout = errors.New("read timeout")
70

71
type WebsocketError struct {
72
        Err error
73

74
        // IsReadError flags whether the error occurred on read or write to the websocket
75
        IsReadError bool
76
}
77

78
func (e WebsocketError) Error() string {
1✔
79
        if e.IsReadError {
2✔
80
                return fmt.Sprintf("websocket read: %v", e.Err)
1✔
81
        }
1✔
82
        return fmt.Sprintf("websocket write: %v", e.Err)
×
83
}
84

85
var (
86
        _ graphql.Transport = Websocket{}
87
        _ error             = WebsocketError{}
88
)
89

90
func (t Websocket) Supports(r *http.Request) bool {
90✔
91
        return r.Header.Get("Upgrade") != ""
90✔
92
}
90✔
93

94
func (t Websocket) Do(w http.ResponseWriter, r *http.Request, exec graphql.GraphExecutor) {
50✔
95
        t.injectGraphQLWSSubprotocols()
50✔
96
        ws, err := t.Upgrader.Upgrade(w, r, http.Header{})
50✔
97
        if err != nil {
50✔
98
                log.Printf("unable to upgrade %T to websocket %s: ", w, err.Error())
×
99
                SendErrorf(w, http.StatusBadRequest, "unable to upgrade")
×
100
                return
×
101
        }
×
102

103
        var me messageExchanger
50✔
104
        switch ws.Subprotocol() {
50✔
105
        default:
×
106
                msg := websocket.FormatCloseMessage(websocket.CloseProtocolError, fmt.Sprintf("unsupported negotiated subprotocol %s", ws.Subprotocol()))
×
107
                _ = ws.WriteMessage(websocket.CloseMessage, msg)
×
108
                return
×
109
        case graphqlwsSubprotocol, "":
42✔
110
                // clients are required to send a subprotocol, to be backward compatible with the previous implementation we select
42✔
111
                // "graphql-ws" by default
42✔
112
                me = graphqlwsMessageExchanger{c: ws}
42✔
113
        case graphqltransportwsSubprotocol:
8✔
114
                me = graphqltransportwsMessageExchanger{c: ws}
8✔
115
        }
116

117
        conn := wsConnection{
50✔
118
                active:    map[string]context.CancelFunc{},
50✔
119
                conn:      ws,
50✔
120
                ctx:       r.Context(),
50✔
121
                exec:      exec,
50✔
122
                me:        me,
50✔
123
                headers:   r.Header,
50✔
124
                Websocket: t,
50✔
125
        }
50✔
126

50✔
127
        if !conn.init() {
58✔
128
                return
8✔
129
        }
8✔
130

131
        conn.run()
42✔
132
}
133

134
func (c *wsConnection) handlePossibleError(err error, isReadError bool) {
198✔
135
        if c.ErrorFunc != nil && err != nil {
199✔
136
                c.ErrorFunc(c.ctx, WebsocketError{
1✔
137
                        Err:         err,
1✔
138
                        IsReadError: isReadError,
1✔
139
                })
1✔
140
        }
1✔
141
}
142

143
func (c *wsConnection) nextMessageWithTimeout(timeout time.Duration) (message, error) {
1✔
144
        messages, errs := make(chan message, 1), make(chan error, 1)
1✔
145

1✔
146
        go func() {
2✔
147
                if m, err := c.me.NextMessage(); err != nil {
2✔
148
                        errs <- err
1✔
149
                } else {
1✔
150
                        messages <- m
×
151
                }
×
152
        }()
153

154
        select {
1✔
155
        case m := <-messages:
×
156
                return m, nil
×
157
        case err := <-errs:
×
158
                return message{}, err
×
159
        case <-time.After(timeout):
1✔
160
                return message{}, errReadTimeout
1✔
161
        }
162
}
163

164
func (c *wsConnection) init() bool {
50✔
165
        var m message
50✔
166
        var err error
50✔
167

50✔
168
        if c.InitTimeout != 0 {
51✔
169
                m, err = c.nextMessageWithTimeout(c.InitTimeout)
1✔
170
        } else {
50✔
171
                m, err = c.me.NextMessage()
49✔
172
        }
49✔
173

174
        if err != nil {
53✔
175
                if err == errReadTimeout {
4✔
176
                        c.close(websocket.CloseProtocolError, "connection initialisation timeout")
1✔
177
                        return false
1✔
178
                }
1✔
179

180
                if err == errInvalidMsg {
3✔
181
                        c.sendConnectionError("invalid json")
1✔
182
                }
1✔
183

184
                c.close(websocket.CloseProtocolError, "decoding error")
2✔
185
                return false
2✔
186
        }
187

188
        switch m.t {
47✔
189
        case initMessageType:
45✔
190
                if len(m.payload) > 0 {
47✔
191
                        c.initPayload = make(InitPayload)
2✔
192
                        err := json.Unmarshal(m.payload, &c.initPayload)
2✔
193
                        if err != nil {
2✔
194
                                return false
×
195
                        }
×
196
                }
197

198
                var initAckPayload *InitPayload
45✔
199
                if c.InitFunc != nil {
54✔
200
                        var ctx context.Context
9✔
201
                        ctx, initAckPayload, err = c.InitFunc(c.ctx, c.initPayload)
9✔
202
                        if err != nil {
12✔
203
                                c.sendConnectionError("%s", err.Error())
3✔
204
                                c.close(websocket.CloseNormalClosure, "terminated")
3✔
205
                                return false
3✔
206
                        }
3✔
207
                        c.ctx = ctx
6✔
208
                }
209

210
                if initAckPayload != nil {
43✔
211
                        initJsonAckPayload, err := json.Marshal(*initAckPayload)
1✔
212
                        if err != nil {
1✔
213
                                panic(err)
×
214
                        }
215
                        c.write(&message{t: connectionAckMessageType, payload: initJsonAckPayload})
1✔
216
                } else {
41✔
217
                        c.write(&message{t: connectionAckMessageType})
41✔
218
                }
41✔
219
                c.write(&message{t: keepAliveMessageType})
42✔
220
        case connectionCloseMessageType:
1✔
221
                c.close(websocket.CloseNormalClosure, "terminated")
1✔
222
                return false
1✔
223
        default:
1✔
224
                c.sendConnectionError("unexpected message %s", m.t)
1✔
225
                c.close(websocket.CloseProtocolError, "unexpected message")
1✔
226
                return false
1✔
227
        }
228

229
        return true
42✔
230
}
231

232
func (c *wsConnection) write(msg *message) {
164✔
233
        c.mu.Lock()
164✔
234
        c.handlePossibleError(c.me.Send(msg), false)
164✔
235
        c.mu.Unlock()
164✔
236
}
164✔
237

238
func (c *wsConnection) run() {
42✔
239
        // We create a cancellation that will shutdown the keep-alive when we leave
42✔
240
        // this function.
42✔
241
        ctx, cancel := context.WithCancel(c.ctx)
42✔
242
        defer func() {
83✔
243
                cancel()
41✔
244
        }()
41✔
245

246
        // If we're running in graphql-ws mode, create a timer that will trigger a
247
        // keep alive message every interval
248
        if (c.conn.Subprotocol() == "" || c.conn.Subprotocol() == graphqlwsSubprotocol) && c.KeepAlivePingInterval != 0 {
62✔
249
                c.mu.Lock()
20✔
250
                c.keepAliveTicker = time.NewTicker(c.KeepAlivePingInterval)
20✔
251
                c.mu.Unlock()
20✔
252

20✔
253
                go c.keepAlive(ctx)
20✔
254
        }
20✔
255

256
        // If we're running in graphql-transport-ws mode, create a timer that will trigger a
257
        // just a pong message every interval
258
        if c.conn.Subprotocol() == graphqltransportwsSubprotocol && c.PongOnlyInterval != 0 {
43✔
259
                c.mu.Lock()
1✔
260
                c.pongOnlyTicker = time.NewTicker(c.PongOnlyInterval)
1✔
261
                c.mu.Unlock()
1✔
262

1✔
263
                go c.keepAlivePongOnly(ctx)
1✔
264
        }
1✔
265

266
        // If we're running in graphql-transport-ws mode, create a timer that will
267
        // trigger a ping message every interval and expect a pong!
268
        if c.conn.Subprotocol() == graphqltransportwsSubprotocol && c.PingPongInterval != 0 {
46✔
269
                c.mu.Lock()
4✔
270
                c.pingPongTicker = time.NewTicker(c.PingPongInterval)
4✔
271
                c.mu.Unlock()
4✔
272

4✔
273
                if !c.MissingPongOk {
7✔
274
                        // Note: when the connection is closed by this deadline, the client
3✔
275
                        // will receive an "invalid close code"
3✔
276
                        _ = c.conn.SetReadDeadline(time.Now().UTC().Add(2 * c.PingPongInterval))
3✔
277
                }
3✔
278
                go c.ping(ctx)
4✔
279
        }
280

281
        // Close the connection when the context is cancelled.
282
        // Will optionally send a "close reason" that is retrieved from the context.
283
        go c.closeOnCancel(ctx)
42✔
284

42✔
285
        for {
112✔
286
                start := graphql.Now()
70✔
287
                m, err := c.me.NextMessage()
70✔
288
                if err != nil {
108✔
289
                        // If the connection got closed by us, don't report the error
38✔
290
                        if !errors.Is(err, net.ErrClosed) {
73✔
291
                                c.handlePossibleError(err, true)
35✔
292
                        }
35✔
293
                        return
38✔
294
                }
295

296
                switch m.t {
31✔
297
                case startMessageType:
24✔
298
                        c.subscribe(start, &m)
24✔
299
                case stopMessageType:
2✔
300
                        c.mu.Lock()
2✔
301
                        closer := c.active[m.id]
2✔
302
                        c.mu.Unlock()
2✔
303
                        if closer != nil {
4✔
304
                                closer()
2✔
305
                        }
2✔
306
                case connectionCloseMessageType:
3✔
307
                        c.close(websocket.CloseNormalClosure, "terminated")
3✔
308
                        return
3✔
309
                case pingMessageType:
1✔
310
                        c.write(&message{t: pongMessageType, payload: m.payload})
1✔
311
                case pongMessageType:
1✔
312
                        c.mu.Lock()
1✔
313
                        c.receivedPong = true
1✔
314
                        c.mu.Unlock()
1✔
315
                        // Clear ReadTimeout -- 0 time val clears.
1✔
316
                        _ = c.conn.SetReadDeadline(time.Time{})
1✔
317
                default:
×
318
                        c.sendConnectionError("unexpected message %s", m.t)
×
319
                        c.close(websocket.CloseProtocolError, "unexpected message")
×
320
                        return
×
321
                }
322
        }
323
}
324

325
func (c *wsConnection) keepAlivePongOnly(ctx context.Context) {
1✔
326
        for {
5✔
327
                select {
4✔
328
                case <-ctx.Done():
×
329
                        c.pongOnlyTicker.Stop()
×
330
                        return
×
331
                case <-c.pongOnlyTicker.C:
3✔
332
                        c.write(&message{t: pongMessageType, payload: json.RawMessage{}})
3✔
333
                }
334
        }
335
}
336

337
func (c *wsConnection) keepAlive(ctx context.Context) {
20✔
338
        for {
47✔
339
                select {
27✔
340
                case <-ctx.Done():
20✔
341
                        c.keepAliveTicker.Stop()
20✔
342
                        return
20✔
343
                case <-c.keepAliveTicker.C:
7✔
344
                        c.write(&message{t: keepAliveMessageType})
7✔
345
                }
346
        }
347
}
348

349
func (c *wsConnection) ping(ctx context.Context) {
4✔
350
        for {
15✔
351
                select {
11✔
352
                case <-ctx.Done():
4✔
353
                        c.pingPongTicker.Stop()
4✔
354
                        return
4✔
355
                case <-c.pingPongTicker.C:
7✔
356
                        c.write(&message{t: pingMessageType, payload: json.RawMessage{}})
7✔
357
                        // The initial deadline for this method is set in run()
7✔
358
                        // if we have not yet received a pong, don't reset the deadline.
7✔
359
                        c.mu.Lock()
7✔
360
                        if !c.MissingPongOk && c.receivedPong {
8✔
361
                                _ = c.conn.SetReadDeadline(time.Now().UTC().Add(2 * c.PingPongInterval))
1✔
362
                        }
1✔
363
                        c.receivedPong = false
7✔
364
                        c.mu.Unlock()
7✔
365
                }
366
        }
367
}
368

369
func (c *wsConnection) closeOnCancel(ctx context.Context) {
42✔
370
        <-ctx.Done()
42✔
371

42✔
372
        if r := closeReasonForContext(ctx); r != "" {
43✔
373
                c.sendConnectionError("%s", r)
1✔
374
        }
1✔
375
        c.close(websocket.CloseNormalClosure, "terminated")
41✔
376
}
377

378
func (c *wsConnection) subscribe(start time.Time, msg *message) {
24✔
379
        ctx := graphql.StartOperationTrace(c.ctx)
24✔
380
        var params *graphql.RawParams
24✔
381
        if err := jsonDecode(bytes.NewReader(msg.payload), &params); err != nil {
24✔
382
                c.sendError(msg.id, &gqlerror.Error{Message: "invalid json"})
×
383
                c.complete(msg.id)
×
384
                return
×
385
        }
×
386

387
        params.ReadTime = graphql.TraceTiming{
24✔
388
                Start: start,
24✔
389
                End:   graphql.Now(),
24✔
390
        }
24✔
391

24✔
392
        params.Headers = c.headers
24✔
393

24✔
394
        rc, err := c.exec.CreateOperationContext(ctx, params)
24✔
395
        if err != nil {
25✔
396
                resp := c.exec.DispatchError(graphql.WithOperationContext(ctx, rc), err)
1✔
397
                switch errcode.GetErrorKind(err) {
1✔
398
                case errcode.KindProtocol:
1✔
399
                        c.sendError(msg.id, resp.Errors...)
1✔
400
                default:
×
401
                        c.sendResponse(msg.id, &graphql.Response{Errors: err})
×
402
                }
403

404
                c.complete(msg.id)
1✔
405
                return
1✔
406
        }
407

408
        ctx = graphql.WithOperationContext(ctx, rc)
23✔
409

23✔
410
        if c.initPayload != nil {
25✔
411
                ctx = withInitPayload(ctx, c.initPayload)
2✔
412
        }
2✔
413

414
        ctx, cancel := context.WithCancel(ctx)
23✔
415
        c.mu.Lock()
23✔
416
        c.active[msg.id] = cancel
23✔
417
        c.mu.Unlock()
23✔
418

23✔
419
        go func() {
46✔
420
                ctx = withSubscriptionErrorContext(ctx)
23✔
421
                defer func() {
44✔
422
                        if r := recover(); r != nil {
21✔
423
                                err := rc.Recover(ctx, r)
×
424
                                var gqlerr *gqlerror.Error
×
425
                                if !errors.As(err, &gqlerr) {
×
426
                                        gqlerr = &gqlerror.Error{}
×
427
                                        if err != nil {
×
428
                                                gqlerr.Message = err.Error()
×
429
                                        }
×
430
                                }
431
                                c.sendError(msg.id, gqlerr)
×
432
                        }
433
                        if errs := getSubscriptionError(ctx); len(errs) != 0 {
21✔
434
                                c.sendError(msg.id, errs...)
×
435
                        } else {
21✔
436
                                c.complete(msg.id)
21✔
437
                        }
21✔
438
                        c.mu.Lock()
21✔
439
                        delete(c.active, msg.id)
21✔
440
                        c.mu.Unlock()
21✔
441
                        cancel()
21✔
442
                }()
443

444
                responses, ctx := c.exec.DispatchOperation(ctx, rc)
23✔
445
                for {
78✔
446
                        response := responses(ctx)
55✔
447
                        if response == nil {
76✔
448
                                break
21✔
449
                        }
450

451
                        c.sendResponse(msg.id, response)
33✔
452
                }
453

454
                // complete and context cancel comes from the defer
455
        }()
456
}
457

458
func (c *wsConnection) sendResponse(id string, response *graphql.Response) {
33✔
459
        b, err := json.Marshal(response)
33✔
460
        if err != nil {
33✔
461
                panic(err)
×
462
        }
463
        c.write(&message{
33✔
464
                payload: b,
33✔
465
                id:      id,
33✔
466
                t:       dataMessageType,
33✔
467
        })
33✔
468
}
469

470
func (c *wsConnection) complete(id string) {
22✔
471
        c.write(&message{id: id, t: completeMessageType})
22✔
472
}
22✔
473

474
func (c *wsConnection) sendError(id string, errors ...*gqlerror.Error) {
1✔
475
        errs := make([]error, len(errors))
1✔
476
        for i, err := range errors {
2✔
477
                errs[i] = err
1✔
478
        }
1✔
479
        b, err := json.Marshal(errs)
1✔
480
        if err != nil {
1✔
481
                panic(err)
×
482
        }
483
        c.write(&message{t: errorMessageType, id: id, payload: b})
1✔
484
}
485

486
func (c *wsConnection) sendConnectionError(format string, args ...any) {
6✔
487
        b, err := json.Marshal(&gqlerror.Error{Message: fmt.Sprintf(format, args...)})
6✔
488
        if err != nil {
6✔
489
                panic(err)
×
490
        }
491

492
        c.write(&message{t: connectionErrorMessageType, payload: b})
6✔
493
}
494

495
func (c *wsConnection) close(closeCode int, message string) {
52✔
496
        c.mu.Lock()
52✔
497
        if c.closed {
55✔
498
                c.mu.Unlock()
3✔
499
                return
3✔
500
        }
3✔
501
        _ = c.conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(closeCode, message))
48✔
502
        for _, closer := range c.active {
54✔
503
                closer()
6✔
504
        }
6✔
505
        c.closed = true
48✔
506
        c.mu.Unlock()
48✔
507
        _ = c.conn.Close()
48✔
508

48✔
509
        if c.CloseFunc != nil {
53✔
510
                c.CloseFunc(c.ctx, closeCode)
5✔
511
        }
5✔
512
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc