• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

hprose / hprose-golang / 7946569327

18 Feb 2024 04:25AM UTC coverage: 85.671% (-0.07%) from 85.74%
7946569327

push

github

andot
rename StructType & ListType

4 of 4 new or added lines in 3 files covered. (100.0%)

22 existing lines in 5 files now uncovered.

9997 of 11669 relevant lines covered (85.67%)

208281.62 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.65
/rpc/socket/handler.go
1
/*--------------------------------------------------------*\
2
|                                                          |
3
|                          hprose                          |
4
|                                                          |
5
| Official WebSite: https://hprose.com                     |
6
|                                                          |
7
| rpc/socket/handler.go                                    |
8
|                                                          |
9
| LastModified: Nov 22, 2022                               |
10
| Author: Ma Bingyao <andot@hprose.com>                    |
11
|                                                          |
12
\*________________________________________________________*/
13

14
package socket
15

16
import (
17
        "context"
18
        "crypto/tls"
19
        "io"
20
        "math"
21
        "net"
22
        "reflect"
23
        "time"
24

25
        "github.com/hprose/hprose-golang/v3/internal/convert"
26
        "github.com/hprose/hprose-golang/v3/rpc/core"
27
)
28

29
type Handler struct {
30
        Service  *core.Service
31
        Pool     core.WorkerPool
32
        OnAccept func(net.Conn) net.Conn
33
        OnClose  func(net.Conn)
34
        OnError  func(net.Conn, error)
35
}
36

37
// BindContext to the http server.
38
func (h *Handler) BindContext(ctx context.Context, server core.Server) {
729✔
39
        go h.bind(ctx, server.(net.Listener))
729✔
40
}
729✔
41

42
func (h *Handler) bind(ctx context.Context, listener net.Listener) {
729✔
43
        ctx, cancel := context.WithCancel(ctx)
729✔
44
        defer cancel()
729✔
45
        var tempDelay time.Duration // how long to sleep on accept failure
729✔
46
        for {
3,113✔
47
                conn, err := listener.Accept()
2,384✔
48
                if err != nil {
3,113✔
49
                        tempDelay = nextTempDelay(err, h.onError, tempDelay)
729✔
50
                        if tempDelay > 0 {
729✔
51
                                continue
×
52
                        }
53
                        return
729✔
54
                }
55
                tempDelay = 0
1,655✔
56
                go h.Serve(ctx, conn)
1,655✔
57
        }
58
}
59

60
func (h *Handler) onAccept(conn net.Conn) net.Conn {
1,655✔
61
        if h.OnAccept != nil {
1,673✔
62
                return h.OnAccept(conn)
18✔
63
        }
18✔
64
        return conn
1,637✔
65
}
66

67
func (h *Handler) onClose(conn net.Conn) {
1,655✔
68
        if h.OnClose != nil {
1,673✔
69
                h.OnClose(conn)
18✔
70
        }
18✔
71
}
72

73
func (h *Handler) onError(conn net.Conn, err error) {
1,655✔
74
        if h.OnError != nil {
1,673✔
75
                h.OnError(conn, err)
18✔
76
        }
18✔
77
}
78

79
func (h *Handler) reportError(ctx context.Context, errChan chan error, err error) {
1,661✔
80
        select {
1,661✔
81
        case <-ctx.Done():
370✔
82
        case errChan <- err:
1,291✔
83
        default:
×
84
        }
85
}
86

87
func (h *Handler) sendResponse(ctx context.Context, queue chan data, index int, body []byte, err error) {
21,452✔
88
        select {
21,452✔
89
        case <-ctx.Done():
259✔
90
        case queue <- data{
91
                Index: index,
92
                Body:  body,
93
                Error: err,
94
        }:
21,193✔
95
        }
96
}
97

98
func (h *Handler) getServiceContext(conn net.Conn) *core.ServiceContext {
21,443✔
99
        serviceContext := core.NewServiceContext(h.Service)
21,443✔
100
        serviceContext.Items().Set("conn", conn)
21,443✔
101
        serviceContext.LocalAddr = conn.LocalAddr()
21,443✔
102
        serviceContext.RemoteAddr = conn.RemoteAddr()
21,443✔
103
        serviceContext.Handler = h
21,443✔
104
        return serviceContext
21,443✔
105
}
21,443✔
106

107
func (h *Handler) run(ctx context.Context, queue chan data, index int, body []byte) {
21,443✔
108
        var err error
21,443✔
109
        defer func() {
42,886✔
110
                if e := recover(); e != nil {
21,443✔
111
                        err = core.NewPanicError(e)
×
112
                }
×
113
                h.sendResponse(ctx, queue, index, body, err)
21,443✔
114
        }()
115
        body, err = h.Service.Handle(ctx, body)
21,443✔
116
}
117

118
func (h *Handler) task(ctx context.Context, queue chan data, index int, body []byte) func() {
×
119
        return func() {
×
120
                h.run(ctx, queue, index, body)
×
121
        }
×
122
}
123

124
func (h *Handler) catch(ctx context.Context, errChan chan error) {
3,310✔
125
        if e := recover(); e != nil {
3,310✔
126
                h.reportError(ctx, errChan, core.NewPanicError(e))
×
127
        }
×
128
}
129

130
func (h *Handler) receive(ctx context.Context, conn net.Conn, queue chan data, errChan chan error) {
1,655✔
131
        defer h.catch(ctx, errChan)
1,655✔
132
        var header [12]byte
1,655✔
133
        for {
24,753✔
134
                select {
23,098✔
UNCOV
135
                case <-ctx.Done():
1✔
UNCOV
136
                        return
1✔
137
                default:
23,097✔
138
                        if _, err := io.ReadAtLeast(conn, header[:], 12); err != nil {
23,842✔
139
                                h.reportError(ctx, errChan, err)
745✔
140
                                return
745✔
141
                        }
745✔
142
                        length, index, ok := parseHeader(header)
22,352✔
143
                        if length == 0 && index == -1 && !ok {
23,252✔
144
                                h.reportError(ctx, errChan, core.InvalidRequestError{})
900✔
145
                                return
900✔
146
                        }
900✔
147
                        if length > h.Service.MaxRequestLength {
21,461✔
148
                                h.sendResponse(ctx, queue, index, nil, core.ErrRequestEntityTooLarge)
9✔
149
                                return
9✔
150
                        }
9✔
151
                        body := make([]byte, length)
21,443✔
152
                        if _, err := io.ReadAtLeast(conn, body, length); err != nil {
21,443✔
UNCOV
153
                                h.reportError(ctx, errChan, err)
×
UNCOV
154
                                return
×
UNCOV
155
                        }
×
156
                        if h.Pool != nil {
21,443✔
157
                                h.Pool.Submit(h.task(core.WithContext(ctx, h.getServiceContext(conn)), queue, index, body))
×
158
                        } else {
21,443✔
159
                                go h.run(core.WithContext(ctx, h.getServiceContext(conn)), queue, index, body)
21,443✔
160
                        }
21,443✔
161
                }
162
        }
163
}
164

165
func (h *Handler) send(ctx context.Context, conn net.Conn, queue chan data, errChan chan error) {
1,655✔
166
        defer h.catch(ctx, errChan)
1,655✔
167
        for {
24,487✔
168
                select {
22,832✔
169
                case <-ctx.Done():
1,639✔
170
                        return
1,639✔
171
                case response := <-queue:
21,193✔
172
                        index, body, e := response.Index, response.Body, response.Error
21,193✔
173
                        if e != nil {
21,202✔
174
                                index |= math.MinInt32
9✔
175
                                if e == core.ErrRequestEntityTooLarge {
18✔
176
                                        body = convert.ToUnsafeBytes(core.RequestEntityTooLarge)
9✔
177
                                } else {
9✔
178
                                        body = convert.ToUnsafeBytes(e.Error())
×
179
                                }
×
180
                        }
181
                        header := makeHeader(len(body), index)
21,193✔
182
                        _, err := conn.Write(header[:])
21,193✔
183
                        if err == nil {
42,383✔
184
                                _, err = conn.Write(body)
21,190✔
185
                        }
21,190✔
186
                        if err != nil {
21,200✔
187
                                h.reportError(ctx, errChan, err)
7✔
188
                                return
7✔
189
                        }
7✔
190
                        if e != nil {
21,195✔
191
                                h.reportError(ctx, errChan, e)
9✔
192
                                return
9✔
193
                        }
9✔
194
                }
195
        }
196
}
197

198
func (h *Handler) Serve(ctx context.Context, conn net.Conn) {
1,655✔
199
        if conn = h.onAccept(conn); conn == nil {
1,655✔
200
                return
×
201
        }
×
202
        ctx, cancel := context.WithCancel(ctx)
1,655✔
203
        var err error
1,655✔
204
        defer func() {
3,310✔
205
                cancel()
1,655✔
206
                if e := recover(); e != nil {
1,655✔
207
                        err = core.NewPanicError(e)
×
208
                }
×
209
                if err != nil {
3,310✔
210
                        h.onError(conn, err)
1,655✔
211
                }
1,655✔
212
                h.onClose(conn)
1,655✔
213
                conn.Close()
1,655✔
214
        }()
215
        queue := make(chan data)
1,655✔
216
        errChan := make(chan error, 1)
1,655✔
217
        go h.receive(ctx, conn, queue, errChan)
1,655✔
218
        go h.send(ctx, conn, queue, errChan)
1,655✔
219
        select {
1,655✔
220
        case <-ctx.Done():
737✔
221
                err = ctx.Err()
737✔
222
        case err = <-errChan:
918✔
223
        }
224
}
225

226
type handlerFactory struct {
227
        serverTypes []reflect.Type
228
}
229

230
func (factory handlerFactory) ServerTypes() []reflect.Type {
45✔
231
        return factory.serverTypes
45✔
232
}
45✔
233

234
func (factory handlerFactory) New(service *core.Service) core.Handler {
1,026✔
235
        return &Handler{
1,026✔
236
                Service: service,
1,026✔
237
        }
1,026✔
238
}
1,026✔
239

240
func RegisterHandler() {
45✔
241
        core.RegisterHandler("socket", handlerFactory{
45✔
242
                []reflect.Type{
45✔
243
                        reflect.TypeOf((*net.TCPListener)(nil)),
45✔
244
                        reflect.TypeOf((*net.UnixListener)(nil)),
45✔
245
                        reflect.TypeOf(tls.NewListener(nil, nil)),
45✔
246
                },
45✔
247
        })
45✔
248
}
45✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc