Coveralls logob
Coveralls logo
  • Home
  • Features
  • Pricing
  • Docs
  • Sign In

emqx / emqx / 6015

19 Jul 2019 - 6:19 coverage decreased (-0.2%) to 69.428%
6015

Pull #2699

travis-ci

9181eb84f9c35729a3bad740fb7f9d93?size=18&default=identiconweb-flow
Update ekka tag
Pull Request #2699: Support K8S hostname auto discovery cluster

3361 of 4841 relevant lines covered (69.43%)

194.69 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

67.16
/src/emqx_ws_channel.erl
1
%%--------------------------------------------------------------------
2
%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved.
3
%%
4
%% Licensed under the Apache License, Version 2.0 (the "License");
5
%% you may not use this file except in compliance with the License.
6
%% You may obtain a copy of the License at
7
%%
8
%%     http://www.apache.org/licenses/LICENSE-2.0
9
%%
10
%% Unless required by applicable law or agreed to in writing, software
11
%% distributed under the License is distributed on an "AS IS" BASIS,
12
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
%% See the License for the specific language governing permissions and
14
%% limitations under the License.
15
%%--------------------------------------------------------------------
16

17
-module(emqx_ws_channel).
18

19
-include("emqx.hrl").
20
-include("emqx_mqtt.hrl").
21
-include("logger.hrl").
22

23
-logger_header("[WS Channel]").
24

25
-export([ info/1
26
        , attrs/1
27
        , stats/1
28
        , kick/1
29
        , session/1
30
        ]).
31

32
%% websocket callbacks
33
-export([ init/2
34
        , websocket_init/1
35
        , websocket_handle/2
36
        , websocket_info/2
37
        , terminate/3
38
        ]).
39

40
-record(state, {
41
          request,
42
          options,
43
          peername,
44
          sockname,
45
          proto_state,
46
          parse_state,
47
          keepalive,
48
          enable_stats,
49
          stats_timer,
50
          idle_timeout,
51
          shutdown
52
         }).
53

54
-define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt]).
55

56
%%--------------------------------------------------------------------
57
%% API
58
%%--------------------------------------------------------------------
59

60
%% for debug
61
info(WSPid) when is_pid(WSPid) ->
62
    call(WSPid, info);
1×
63

64
info(#state{peername    = Peername,
65
            sockname    = Sockname,
66
            proto_state = ProtoState}) ->
67
    ProtoInfo = emqx_protocol:info(ProtoState),
1×
68
    ConnInfo = #{socktype => websocket,
1×
69
                 conn_state => running,
70
                 peername => Peername,
71
                 sockname => Sockname},
72
    maps:merge(ProtoInfo, ConnInfo).
1×
73

74
%% for dashboard
75
attrs(WSPid) when is_pid(WSPid) ->
76
    call(WSPid, attrs);
1×
77

78
attrs(#state{peername    = Peername,
79
             sockname    = Sockname,
80
             proto_state = ProtoState}) ->
81
    SockAttrs = #{peername => Peername,
1×
82
                  sockname => Sockname},
83
    ProtoAttrs = emqx_protocol:attrs(ProtoState),
1×
84
    maps:merge(SockAttrs, ProtoAttrs).
1×
85

86
stats(WSPid) when is_pid(WSPid) ->
87
    call(WSPid, stats);
1×
88

89
stats(#state{proto_state = ProtoState}) ->
90
    lists:append([wsock_stats(),
1×
91
                  emqx_misc:proc_stats(),
92
                  emqx_protocol:stats(ProtoState)
93
                 ]).
94

95
kick(WSPid) when is_pid(WSPid) ->
96
    call(WSPid, kick).
1×
97

98
session(WSPid) when is_pid(WSPid) ->
99
    call(WSPid, session).
1×
100

101
call(WSPid, Req) when is_pid(WSPid) ->
102
    Mref = erlang:monitor(process, WSPid),
5×
103
    WSPid ! {call, {self(), Mref}, Req},
5×
104
    receive
5×
105
        {Mref, Reply} ->
106
            erlang:demonitor(Mref, [flush]),
5×
107
            Reply;
5×
108
        {'DOWN', Mref, _, _, Reason} ->
109
            exit(Reason)
!
110
    after 5000 ->
111
        erlang:demonitor(Mref, [flush]),
!
112
        exit(timeout)
!
113
    end.
114

115
%%--------------------------------------------------------------------
116
%% WebSocket callbacks
117
%%--------------------------------------------------------------------
118

119
init(Req, Opts) ->
120
    IdleTimeout = proplists:get_value(idle_timeout, Opts, 7200000),
5×
121
    DeflateOptions = maps:from_list(proplists:get_value(deflate_options, Opts, [])),
5×
122
    MaxFrameSize = case proplists:get_value(max_frame_size, Opts, 0) of
5×
123
                       0 -> infinity;
5×
124
                       MFS -> MFS
!
125
                   end,
126
    Compress = proplists:get_value(compress, Opts, false),
5×
127
    Options = #{compress => Compress,
5×
128
                deflate_opts => DeflateOptions,
129
                max_frame_size => MaxFrameSize,
130
                idle_timeout => IdleTimeout},
131
    case cowboy_req:parse_header(<<"sec-websocket-protocol">>, Req) of
5×
132
        undefined ->
133
            {cowboy_websocket, Req, #state{}, Options};
!
134
        [<<"mqtt", Vsn/binary>>] ->
135
            Resp = cowboy_req:set_resp_header(<<"sec-websocket-protocol">>, <<"mqtt", Vsn/binary>>, Req),
5×
136
            {cowboy_websocket, Resp, #state{request = Req, options = Opts}, Options};
5×
137
        _ ->
138
            {ok, cowboy_req:reply(400, Req), #state{}}
!
139
    end.
140

141
websocket_init(#state{request = Req, options = Options}) ->
142
    Peername = cowboy_req:peer(Req),
5×
143
    Sockname = cowboy_req:sock(Req),
5×
144
    Peercert = cowboy_req:cert(Req),
5×
145
    WsCookie = try cowboy_req:parse_cookies(Req)
5×
146
               catch
147
                   error:badarg ->
148
                       ?LOG(error, "Illegal cookie"),
!
149
                       undefined;
!
150
                   Error:Reason ->
151
                       ?LOG(error,
!
152
                            "Cookie is parsed failed, Error: ~p, Reason ~p",
153
                            [Error, Reason]),
154
                       undefined
!
155
               end,
156
    ProtoState = emqx_protocol:init(#{peername => Peername,
5×
157
                                      sockname => Sockname,
158
                                      peercert => Peercert,
159
                                      sendfun  => send_fun(self()),
160
                                      ws_cookie => WsCookie,
161
                                      conn_mod => ?MODULE}, Options),
162
    Zone = proplists:get_value(zone, Options),
5×
163
    MaxSize = emqx_zone:get_env(Zone, max_packet_size, ?MAX_PACKET_SIZE),
5×
164
    ParseState = emqx_frame:initial_parse_state(#{max_size => MaxSize}),
5×
165
    EnableStats = emqx_zone:get_env(Zone, enable_stats, true),
5×
166
    IdleTimout = emqx_zone:get_env(Zone, idle_timeout, 30000),
5×
167
    emqx_logger:set_metadata_peername(esockd_net:format(Peername)),
5×
168
    ok = emqx_misc:init_proc_mng_policy(Zone),
5×
169
    {ok, #state{peername     = Peername,
5×
170
                sockname     = Sockname,
171
                parse_state  = ParseState,
172
                proto_state  = ProtoState,
173
                enable_stats = EnableStats,
174
                idle_timeout = IdleTimout}}.
175

176
send_fun(WsPid) ->
177
    fun(Packet, Options) ->
5×
178
        Data = emqx_frame:serialize(Packet, Options),
8×
179
        BinSize = iolist_size(Data),
8×
180
        emqx_pd:update_counter(send_cnt, 1),
8×
181
        emqx_pd:update_counter(send_oct, BinSize),
8×
182
        WsPid ! {binary, iolist_to_binary(Data)},
8×
183
        {ok, Data}
8×
184
    end.
185

186
stat_fun() ->
187
    fun() -> {ok, emqx_pd:get_counter(recv_oct)} end.
!
188

189
websocket_handle({binary, <<>>}, State) ->
190
    {ok, ensure_stats_timer(State)};
6×
191
websocket_handle({binary, [<<>>]}, State) ->
192
    {ok, ensure_stats_timer(State)};
!
193
websocket_handle({binary, Data}, State = #state{parse_state = ParseState}) ->
194
    ?LOG(debug, "RECV ~p", [Data]),
7×
195
    BinSize = iolist_size(Data),
7×
196
    emqx_pd:update_counter(recv_oct, BinSize),
7×
197
    ok = emqx_metrics:inc('bytes.received', BinSize),
7×
198
    try emqx_frame:parse(iolist_to_binary(Data), ParseState) of
7×
199
        {ok, NParseState} ->
200
            {ok, State#state{parse_state = NParseState}};
!
201
        {ok, Packet, Rest, NParseState} ->
202
            ok = emqx_metrics:inc_recv(Packet),
7×
203
            emqx_pd:update_counter(recv_cnt, 1),
7×
204
            handle_incoming(Packet, fun(NState) ->
7×
205
                                            websocket_handle({binary, Rest}, NState)
6×
206
                                    end,
207
                            State#state{parse_state = NParseState});
208
        {error, Reason} ->
209
            ?LOG(error, "Frame error: ~p", [Reason]),
!
210
            shutdown(Reason, State)
!
211
    catch
212
        error:Reason:Stk ->
213
            ?LOG(error, "Parse failed for ~p~n\
!
214
                 Stacktrace:~p~nFrame data: ~p", [Reason, Stk, Data]),
215
            shutdown(parse_error, State)
!
216
    end;
217
%% Pings should be replied with pongs, cowboy does it automatically
218
%% Pongs can be safely ignored. Clause here simply prevents crash.
219
websocket_handle(Frame, State)
220
  when Frame =:= ping; Frame =:= pong ->
221
    {ok, ensure_stats_timer(State)};
!
222
websocket_handle({FrameType, _}, State)
223
  when FrameType =:= ping; FrameType =:= pong ->
224
    {ok, ensure_stats_timer(State)};
!
225
%% According to mqtt spec[https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901285]
226
websocket_handle({_OtherFrameType, _}, State) ->
227
    ?LOG(error, "Frame error: Other type of data frame"),
1×
228
    shutdown(other_frame_type, State).
1×
229

230
websocket_info({call, From, info}, State) ->
231
    gen_server:reply(From, info(State)),
1×
232
    {ok, State};
1×
233

234
websocket_info({call, From, attrs}, State) ->
235
    gen_server:reply(From, attrs(State)),
1×
236
    {ok, State};
1×
237

238
websocket_info({call, From, stats}, State) ->
239
    gen_server:reply(From, stats(State)),
1×
240
    {ok, State};
1×
241

242
websocket_info({call, From, kick}, State) ->
243
    gen_server:reply(From, ok),
1×
244
    shutdown(kick, State);
1×
245

246
websocket_info({call, From, session}, State = #state{proto_state = ProtoState}) ->
247
    gen_server:reply(From, emqx_protocol:session(ProtoState)),
1×
248
    {ok, State};
1×
249

250
websocket_info({deliver, PubOrAck}, State = #state{proto_state = ProtoState}) ->
251
    case emqx_protocol:deliver(PubOrAck, ProtoState) of
2×
252
        {ok, ProtoState1} ->
253
            {ok, ensure_stats_timer(State#state{proto_state = ProtoState1})};
2×
254
        {error, Reason} ->
255
            shutdown(Reason, State)
!
256
    end;
257

258
websocket_info({timeout, Timer, emit_stats},
259
               State = #state{stats_timer = Timer, proto_state = ProtoState}) ->
260
    emqx_cm:set_conn_stats(emqx_protocol:client_id(ProtoState), stats(State)),
!
261
    {ok, State#state{stats_timer = undefined}, hibernate};
!
262

263
websocket_info({keepalive, start, Interval}, State) ->
264
    ?LOG(debug, "Keepalive at the interval of ~p", [Interval]),
!
265
    case emqx_keepalive:start(stat_fun(), Interval, {keepalive, check}) of
!
266
        {ok, KeepAlive} ->
267
            {ok, State#state{keepalive = KeepAlive}};
!
268
        {error, Error} ->
269
            ?LOG(warning, "Keepalive error: ~p", [Error]),
!
270
            shutdown(Error, State)
!
271
    end;
272

273
websocket_info({keepalive, check}, State = #state{keepalive = KeepAlive}) ->
274
    case emqx_keepalive:check(KeepAlive) of
!
275
        {ok, KeepAlive1} ->
276
            {ok, State#state{keepalive = KeepAlive1}};
!
277
        {error, timeout} ->
278
            ?LOG(debug, "Keepalive Timeout!"),
!
279
            shutdown(keepalive_timeout, State);
!
280
        {error, Error} ->
281
            ?LOG(error, "Keepalive error: ~p", [Error]),
!
282
            shutdown(keepalive_error, State)
!
283
    end;
284

285
websocket_info({shutdown, discard, {ClientId, ByPid}}, State) ->
286
    ?LOG(warning, "Discarded by ~s:~p", [ClientId, ByPid]),
!
287
    shutdown(discard, State);
!
288

289
websocket_info({shutdown, conflict, {ClientId, NewPid}}, State) ->
290
    ?LOG(warning, "Clientid '~s' conflict with ~p", [ClientId, NewPid]),
!
291
    shutdown(conflict, State);
!
292

293
websocket_info({binary, Data}, State) ->
294
    {reply, {binary, Data}, State};
7×
295

296
websocket_info({shutdown, Reason}, State) ->
297
    shutdown(Reason, State);
!
298

299
websocket_info({stop, Reason}, State) ->
300
    {stop, State#state{shutdown = Reason}};
3×
301

302
websocket_info(Info, State) ->
303
    ?LOG(error, "Unexpected info: ~p", [Info]),
!
304
    {ok, State}.
!
305

306
terminate(SockError, _Req, #state{keepalive   = Keepalive,
307
                                  proto_state = ProtoState,
308
                                  shutdown    = Shutdown}) ->
309
    ?LOG(debug, "Terminated for ~p, sockerror: ~p",
5×
310
         [Shutdown, SockError]),
311
    emqx_keepalive:cancel(Keepalive),
5×
312
    case {ProtoState, Shutdown} of
5×
313
        {undefined, _} -> ok;
!
314
        {_, {shutdown, Reason}} ->
315
            emqx_protocol:terminate(Reason, ProtoState),
3×
316
            exit(Reason);
317
        {_, Error} ->
2×
318
            emqx_protocol:terminate(Error, ProtoState),
2×
319
            exit({error, SockError})
320
    end.
321

322
%%--------------------------------------------------------------------
323
%% Internal functions
324
%%--------------------------------------------------------------------
325

326
handle_incoming(Packet, SuccFun, State = #state{proto_state = ProtoState}) ->
5×
327
    case emqx_protocol:received(Packet, ProtoState) of
5×
328
        {ok, NProtoState} ->
329
            SuccFun(State#state{proto_state = NProtoState});
1×
330
        {error, Reason} ->
331
            ?LOG(error, "Protocol error: ~p", [Reason]),
4×
332
            shutdown(Reason, State);
333
        {error, Reason, NProtoState} ->
334
            shutdown(Reason, State#state{proto_state = NProtoState});
335
        {stop, Error, NProtoState} ->
7×
336
            shutdown(Error, State#state{proto_state = NProtoState})
337
    end.
6×
338

UNCOV
339
ensure_stats_timer(State = #state{enable_stats = true,
!
UNCOV
340
                                  stats_timer  = undefined,
!
341
                                  idle_timeout = IdleTimeout}) ->
342
    State#state{stats_timer = emqx_misc:start_timer(IdleTimeout, emit_stats)};
1×
343
ensure_stats_timer(State) ->
UNCOV
344
    State.
!
345

346
shutdown(Reason, State) ->
347
    %% Fix the issue#2591(https://github.com/emqx/emqx/issues/2591#issuecomment-500278696)
348
    self() ! {stop, {shutdown, Reason}},
349
    {ok, State}.
350

4×
351
wsock_stats() ->
352
    [{Key, emqx_pd:get_counter(Key)} || Key <- ?SOCK_STATS].
4×
Troubleshooting · Open an Issue · Sales · Support · ENTERPRISE · CAREERS · STATUS
BLOG · TWITTER · Legal & Privacy · Supported CI Services · What's a CI service? · Automated Testing

© 2019 Coveralls, LLC