• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

hprose / hprose-golang / 7816848764

07 Feb 2024 03:05PM UTC coverage: 85.675% (+0.1%) from 85.565%
7816848764

push

github

andot
add StructType & ListType support for decoder

36 of 39 new or added lines in 3 files covered. (92.31%)

27 existing lines in 8 files now uncovered.

9982 of 11651 relevant lines covered (85.68%)

208432.63 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.51
/rpc/socket/handler.go
1
/*--------------------------------------------------------*\
2
|                                                          |
3
|                          hprose                          |
4
|                                                          |
5
| Official WebSite: https://hprose.com                     |
6
|                                                          |
7
| rpc/socket/handler.go                                    |
8
|                                                          |
9
| LastModified: Nov 22, 2022                               |
10
| Author: Ma Bingyao <andot@hprose.com>                    |
11
|                                                          |
12
\*________________________________________________________*/
13

14
package socket
15

16
import (
17
        "context"
18
        "crypto/tls"
19
        "io"
20
        "math"
21
        "net"
22
        "reflect"
23
        "time"
24

25
        "github.com/hprose/hprose-golang/v3/internal/convert"
26
        "github.com/hprose/hprose-golang/v3/rpc/core"
27
)
28

29
type Handler struct {
30
        Service  *core.Service
31
        Pool     core.WorkerPool
32
        OnAccept func(net.Conn) net.Conn
33
        OnClose  func(net.Conn)
34
        OnError  func(net.Conn, error)
35
}
36

37
// BindContext to the http server.
38
func (h *Handler) BindContext(ctx context.Context, server core.Server) {
729✔
39
        go h.bind(ctx, server.(net.Listener))
729✔
40
}
729✔
41

42
func (h *Handler) bind(ctx context.Context, listener net.Listener) {
729✔
43
        ctx, cancel := context.WithCancel(ctx)
729✔
44
        defer cancel()
729✔
45
        var tempDelay time.Duration // how long to sleep on accept failure
729✔
46
        for {
3,112✔
47
                conn, err := listener.Accept()
2,383✔
48
                if err != nil {
3,112✔
49
                        tempDelay = nextTempDelay(err, h.onError, tempDelay)
729✔
50
                        if tempDelay > 0 {
729✔
51
                                continue
×
52
                        }
53
                        return
729✔
54
                }
55
                tempDelay = 0
1,654✔
56
                go h.Serve(ctx, conn)
1,654✔
57
        }
58
}
59

60
func (h *Handler) onAccept(conn net.Conn) net.Conn {
1,654✔
61
        if h.OnAccept != nil {
1,672✔
62
                return h.OnAccept(conn)
18✔
63
        }
18✔
64
        return conn
1,636✔
65
}
66

67
func (h *Handler) onClose(conn net.Conn) {
1,654✔
68
        if h.OnClose != nil {
1,672✔
69
                h.OnClose(conn)
18✔
70
        }
18✔
71
}
72

73
func (h *Handler) onError(conn net.Conn, err error) {
1,654✔
74
        if h.OnError != nil {
1,672✔
75
                h.OnError(conn, err)
18✔
76
        }
18✔
77
}
78

79
func (h *Handler) reportError(ctx context.Context, errChan chan error, err error) {
1,661✔
80
        select {
1,661✔
81
        case <-ctx.Done():
380✔
82
        case errChan <- err:
1,281✔
83
        default:
×
84
        }
85
}
86

87
func (h *Handler) sendResponse(ctx context.Context, queue chan data, index int, body []byte, err error) {
21,950✔
88
        select {
21,950✔
89
        case <-ctx.Done():
334✔
90
        case queue <- data{
91
                Index: index,
92
                Body:  body,
93
                Error: err,
94
        }:
21,616✔
95
        }
96
}
97

98
func (h *Handler) getServiceContext(conn net.Conn) *core.ServiceContext {
21,941✔
99
        serviceContext := core.NewServiceContext(h.Service)
21,941✔
100
        serviceContext.Items().Set("conn", conn)
21,941✔
101
        serviceContext.LocalAddr = conn.LocalAddr()
21,941✔
102
        serviceContext.RemoteAddr = conn.RemoteAddr()
21,941✔
103
        serviceContext.Handler = h
21,941✔
104
        return serviceContext
21,941✔
105
}
21,941✔
106

107
func (h *Handler) run(ctx context.Context, queue chan data, index int, body []byte) {
21,941✔
108
        var err error
21,941✔
109
        defer func() {
43,882✔
110
                if e := recover(); e != nil {
21,941✔
111
                        err = core.NewPanicError(e)
×
112
                }
×
113
                h.sendResponse(ctx, queue, index, body, err)
21,941✔
114
        }()
115
        body, err = h.Service.Handle(ctx, body)
21,941✔
116
}
117

118
func (h *Handler) task(ctx context.Context, queue chan data, index int, body []byte) func() {
×
119
        return func() {
×
120
                h.run(ctx, queue, index, body)
×
121
        }
×
122
}
123

124
func (h *Handler) catch(ctx context.Context, errChan chan error) {
3,308✔
125
        if e := recover(); e != nil {
3,308✔
126
                h.reportError(ctx, errChan, core.NewPanicError(e))
×
127
        }
×
128
}
129

130
func (h *Handler) receive(ctx context.Context, conn net.Conn, queue chan data, errChan chan error) {
1,654✔
131
        defer h.catch(ctx, errChan)
1,654✔
132
        var header [12]byte
1,654✔
133
        for {
25,249✔
134
                select {
23,595✔
135
                case <-ctx.Done():
1✔
136
                        return
1✔
137
                default:
23,594✔
138
                        if _, err := io.ReadAtLeast(conn, header[:], 12); err != nil {
24,336✔
139
                                h.reportError(ctx, errChan, err)
742✔
140
                                return
742✔
141
                        }
742✔
142
                        length, index, ok := parseHeader(header)
22,852✔
143
                        if length == 0 && index == -1 && !ok {
23,752✔
144
                                h.reportError(ctx, errChan, core.InvalidRequestError{})
900✔
145
                                return
900✔
146
                        }
900✔
147
                        if length > h.Service.MaxRequestLength {
21,961✔
148
                                h.sendResponse(ctx, queue, index, nil, core.ErrRequestEntityTooLarge)
9✔
149
                                return
9✔
150
                        }
9✔
151
                        body := make([]byte, length)
21,943✔
152
                        if _, err := io.ReadAtLeast(conn, body, length); err != nil {
21,945✔
UNCOV
153
                                h.reportError(ctx, errChan, err)
2✔
UNCOV
154
                                return
2✔
UNCOV
155
                        }
2✔
156
                        if h.Pool != nil {
21,941✔
157
                                h.Pool.Submit(h.task(core.WithContext(ctx, h.getServiceContext(conn)), queue, index, body))
×
158
                        } else {
21,941✔
159
                                go h.run(core.WithContext(ctx, h.getServiceContext(conn)), queue, index, body)
21,941✔
160
                        }
21,941✔
161
                }
162
        }
163
}
164

165
func (h *Handler) send(ctx context.Context, conn net.Conn, queue chan data, errChan chan error) {
1,654✔
166
        defer h.catch(ctx, errChan)
1,654✔
167
        for {
24,907✔
168
                select {
23,253✔
169
                case <-ctx.Done():
1,637✔
170
                        return
1,637✔
171
                case response := <-queue:
21,616✔
172
                        index, body, e := response.Index, response.Body, response.Error
21,616✔
173
                        if e != nil {
21,625✔
174
                                index |= math.MinInt32
9✔
175
                                if e == core.ErrRequestEntityTooLarge {
18✔
176
                                        body = convert.ToUnsafeBytes(core.RequestEntityTooLarge)
9✔
177
                                } else {
9✔
178
                                        body = convert.ToUnsafeBytes(e.Error())
×
179
                                }
×
180
                        }
181
                        header := makeHeader(len(body), index)
21,616✔
182
                        _, err := conn.Write(header[:])
21,616✔
183
                        if err == nil {
43,228✔
184
                                _, err = conn.Write(body)
21,612✔
185
                        }
21,612✔
186
                        if err != nil {
21,624✔
187
                                h.reportError(ctx, errChan, err)
8✔
188
                                return
8✔
189
                        }
8✔
190
                        if e != nil {
21,617✔
191
                                h.reportError(ctx, errChan, e)
9✔
192
                                return
9✔
193
                        }
9✔
194
                }
195
        }
196
}
197

198
func (h *Handler) Serve(ctx context.Context, conn net.Conn) {
1,654✔
199
        if conn = h.onAccept(conn); conn == nil {
1,654✔
200
                return
×
201
        }
×
202
        ctx, cancel := context.WithCancel(ctx)
1,654✔
203
        var err error
1,654✔
204
        defer func() {
3,308✔
205
                cancel()
1,654✔
206
                if e := recover(); e != nil {
1,654✔
207
                        err = core.NewPanicError(e)
×
208
                }
×
209
                if err != nil {
3,308✔
210
                        h.onError(conn, err)
1,654✔
211
                }
1,654✔
212
                h.onClose(conn)
1,654✔
213
                conn.Close()
1,654✔
214
        }()
215
        queue := make(chan data)
1,654✔
216
        errChan := make(chan error, 1)
1,654✔
217
        go h.receive(ctx, conn, queue, errChan)
1,654✔
218
        go h.send(ctx, conn, queue, errChan)
1,654✔
219
        select {
1,654✔
220
        case <-ctx.Done():
736✔
221
                err = ctx.Err()
736✔
222
        case err = <-errChan:
918✔
223
        }
224
}
225

226
type handlerFactory struct {
227
        serverTypes []reflect.Type
228
}
229

230
func (factory handlerFactory) ServerTypes() []reflect.Type {
45✔
231
        return factory.serverTypes
45✔
232
}
45✔
233

234
func (factory handlerFactory) New(service *core.Service) core.Handler {
1,026✔
235
        return &Handler{
1,026✔
236
                Service: service,
1,026✔
237
        }
1,026✔
238
}
1,026✔
239

240
func RegisterHandler() {
45✔
241
        core.RegisterHandler("socket", handlerFactory{
45✔
242
                []reflect.Type{
45✔
243
                        reflect.TypeOf((*net.TCPListener)(nil)),
45✔
244
                        reflect.TypeOf((*net.UnixListener)(nil)),
45✔
245
                        reflect.TypeOf(tls.NewListener(nil, nil)),
45✔
246
                },
45✔
247
        })
45✔
248
}
45✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc