Coveralls logob
Coveralls logo
  • Home
  • Features
  • Pricing
  • Docs
  • Sign In

emqx / emqx / 5609

8 May 2019 - 13:18 coverage decreased (-0.2%) to 68.601%
5609

Pull #2521

travis-ci

9181eb84f9c35729a3bad740fb7f9d93?size=18&default=identiconweb-flow
Restore app.confg rule in makefile for debug and test
Pull Request #2521: Auto-pull-request-by-2019-05-09

2 of 7 new or added lines in 1 file covered. (28.57%)

23 existing lines in 7 files now uncovered.

3452 of 5032 relevant lines covered (68.6%)

208.83 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

49.62
/src/emqx_client.erl
1
%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved.
2
%%
3
%% Licensed under the Apache License, Version 2.0 (the "License");
4
%% you may not use this file except in compliance with the License.
5
%% You may obtain a copy of the License at
6
%%
7
%%     http://www.apache.org/licenses/LICENSE-2.0
8
%%
9
%% Unless required by applicable law or agreed to in writing, software
10
%% distributed under the License is distributed on an "AS IS" BASIS,
11
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
%% See the License for the specific language governing permissions and
13
%% limitations under the License.
14

15
-module(emqx_client).
16

17
-behaviour(gen_statem).
18

19
-include("logger.hrl").
20
-include("types.hrl").
21
-include("emqx_client.hrl").
22

23
-export([start_link/0, start_link/1]).
24

25
-export([ connect/1
26
        , disconnect/1
27
        , disconnect/2
28
        , disconnect/3
29
        ]).
30

31
-export([ping/1]).
32

33
%% PubSub
34
-export([ subscribe/2
35
        , subscribe/3
36
        , subscribe/4
37
        , publish/2
38
        , publish/3
39
        , publish/4
40
        , publish/5
41
        , unsubscribe/2
42
        , unsubscribe/3
43
        ]).
44

45
%% Puback...
46
-export([ puback/2
47
        , puback/3
48
        , puback/4
49
        , pubrec/2
50
        , pubrec/3
51
        , pubrec/4
52
        , pubrel/2
53
        , pubrel/3
54
        , pubrel/4
55
        , pubcomp/2
56
        , pubcomp/3
57
        , pubcomp/4
58
        ]).
59

60
-export([subscriptions/1]).
61

62
-export([info/1, stop/1]).
63

64
%% For test cases
65
-export([pause/1, resume/1]).
66

67
-export([ initialized/3
68
        , waiting_for_connack/3
69
        , connected/3
70
        , inflight_full/3
71
        ]).
72

73
-export([ init/1
74
        , callback_mode/0
75
        , handle_event/4
76
        , terminate/3
77
        , code_change/4
78
        ]).
79

80
-export_type([ host/0
81
             , client/0
82
             , option/0
83
             , properties/0
84
             , payload/0
85
             , pubopt/0
86
             , subopt/0
87
             , mqtt_msg/0
88
             ]).
89

90
%% Default timeout
91
-define(DEFAULT_KEEPALIVE,       60).
92
-define(DEFAULT_ACK_TIMEOUT,     30000).
93
-define(DEFAULT_CONNECT_TIMEOUT, 60000).
94

95
-define(PROPERTY(Name, Val), #state{properties = #{Name := Val}}).
96

97
-define(WILL_MSG(QoS, Retain, Topic, Props, Payload),
98
        #mqtt_msg{qos = QoS, retain = Retain, topic = Topic, props = Props, payload = Payload}).
99

100
-define(NO_CLIENT_ID, <<>>).
101

102
-type(host() :: inet:ip_address() | inet:hostname()).
103

104
%% Message handler is a set of callbacks defined to handle MQTT messages
105
%% as well as the disconnect event.
106
-define(NO_MSG_HDLR, undefined).
107
-type(msg_handler() :: #{puback := fun((_) -> any()),
108
                         publish := fun((emqx_types:message()) -> any()),
109
                         disconnected := fun(({reason_code(), _Properties :: term()}) -> any())
110
                        }).
111

112
-type(option() :: {name, atom()}
113
                | {owner, pid()}
114
                | {msg_handler, msg_handler()}
115
                | {host, host()}
116
                | {hosts, [{host(), inet:port_number()}]}
117
                | {port, inet:port_number()}
118
                | {tcp_opts, [gen_tcp:option()]}
119
                | {ssl, boolean()}
120
                | {ssl_opts, [ssl:ssl_option()]}
121
                | {connect_timeout, pos_integer()}
122
                | {bridge_mode, boolean()}
123
                | {client_id, iodata()}
124
                | {clean_start, boolean()}
125
                | {username, iodata()}
126
                | {password, iodata()}
127
                | {proto_ver, v3 | v4 | v5}
128
                | {keepalive, non_neg_integer()}
129
                | {max_inflight, pos_integer()}
130
                | {retry_interval, timeout()}
131
                | {will_topic, iodata()}
132
                | {will_payload, iodata()}
133
                | {will_retain, boolean()}
134
                | {will_qos, qos()}
135
                | {will_props, properties()}
136
                | {auto_ack, boolean()}
137
                | {ack_timeout, pos_integer()}
138
                | {force_ping, boolean()}
139
                | {properties, properties()}).
140

141
-type(mqtt_msg() :: #mqtt_msg{}).
142

143
-record(state, {name            :: atom(),
144
                owner           :: pid(),
145
                msg_handler     :: ?NO_MSG_HDLR | msg_handler(),
146
                host            :: host(),
147
                port            :: inet:port_number(),
148
                hosts           :: [{host(), inet:port_number()}],
149
                socket          :: inet:socket(),
150
                sock_opts       :: [emqx_client_sock:option()],
151
                connect_timeout :: pos_integer(),
152
                bridge_mode     :: boolean(),
153
                client_id       :: binary(),
154
                clean_start     :: boolean(),
155
                username        :: maybe(binary()),
156
                password        :: maybe(binary()),
157
                proto_ver       :: emqx_mqtt_types:version(),
158
                proto_name      :: iodata(),
159
                keepalive       :: non_neg_integer(),
160
                keepalive_timer :: maybe(reference()),
161
                force_ping      :: boolean(),
162
                paused          :: boolean(),
163
                will_flag       :: boolean(),
164
                will_msg        :: mqtt_msg(),
165
                properties      :: properties(),
166
                pending_calls   :: list(),
167
                subscriptions   :: map(),
168
                max_inflight    :: infinity | pos_integer(),
169
                inflight        :: emqx_inflight:inflight(),
170
                awaiting_rel    :: map(),
171
                auto_ack        :: boolean(),
172
                ack_timeout     :: pos_integer(),
173
                ack_timer       :: reference(),
174
                retry_interval  :: pos_integer(),
175
                retry_timer     :: reference(),
176
                session_present :: boolean(),
177
                last_packet_id  :: packet_id(),
178
                parse_state     :: emqx_frame:state()}).
179

180
-record(call, {id, from, req, ts}).
181

182
-type(client() :: pid() | atom()).
183

184
-type(topic() :: emqx_topic:topic()).
185

186
-type(payload() :: iodata()).
187

188
-type(packet_id() :: emqx_mqtt_types:packet_id()).
189

190
-type(properties() :: emqx_mqtt_types:properties()).
191

192
-type(qos() :: emqx_mqtt_types:qos_name() | emqx_mqtt_types:qos()).
193

194
-type(pubopt() :: {retain, boolean()} | {qos, qos()} | {timeout, timeout()}).
195

196
-type(subopt() :: {rh, 0 | 1 | 2}
197
                | {rap, boolean()}
198
                | {nl,  boolean()}
199
                | {qos, qos()}).
200

201
-type(reason_code() :: emqx_mqtt_types:reason_code()).
202

203
-type(subscribe_ret() :: {ok, properties(), [reason_code()]} | {error, term()}).
204

205
%%------------------------------------------------------------------------------
206
%% API
207
%%------------------------------------------------------------------------------
208

209
-spec(start_link() -> gen_statem:start_ret()).
210
start_link() -> start_link([]).
3×
211

212
-spec(start_link(map() | [option()]) -> gen_statem:start_ret()).
213
start_link(Options) when is_map(Options) ->
214
    start_link(maps:to_list(Options));
1×
215
start_link(Options) when is_list(Options) ->
216
    ok = emqx_mqtt_props:validate(
34×
217
            proplists:get_value(properties, Options, #{})),
218
    case proplists:get_value(name, Options) of
34×
219
        undefined ->
220
            gen_statem:start_link(?MODULE, [with_owner(Options)], []);
34×
221
        Name when is_atom(Name) ->
222
            gen_statem:start_link({local, Name}, ?MODULE, [with_owner(Options)], [])
!
223
    end.
224

225
with_owner(Options) ->
226
    case proplists:get_value(owner, Options) of
34×
227
        Owner when is_pid(Owner) -> Options;
1×
228
        undefined -> [{owner, self()} | Options]
33×
229
    end.
230

231
-spec(connect(client()) -> {ok, properties()} | {error, term()}).
232
connect(Client) ->
233
    gen_statem:call(Client, connect, infinity).
34×
234

235
-spec(subscribe(client(), topic() | {topic(), qos() | [subopt()]} | [{topic(), qos()}])
236
      -> subscribe_ret()).
237
subscribe(Client, Topic) when is_binary(Topic) ->
238
    subscribe(Client, {Topic, ?QOS_0});
1×
239
subscribe(Client, {Topic, QoS}) when is_binary(Topic), is_atom(QoS) ->
240
    subscribe(Client, {Topic, ?QOS_I(QoS)});
!
241
subscribe(Client, {Topic, QoS}) when is_binary(Topic), ?IS_QOS(QoS) ->
242
    subscribe(Client, [{Topic, ?QOS_I(QoS)}]);
1×
243
subscribe(Client, Topics) when is_list(Topics) ->
244
    subscribe(Client, #{}, lists:map(
2×
245
                             fun({Topic, QoS}) when is_binary(Topic), is_atom(QoS) ->
246
                                 {Topic, [{qos, ?QOS_I(QoS)}]};
!
247
                                ({Topic, QoS}) when is_binary(Topic), ?IS_QOS(QoS) ->
248
                                 {Topic, [{qos, ?QOS_I(QoS)}]};
3×
249
                                ({Topic, Opts}) when is_binary(Topic), is_list(Opts) ->
250
                                 {Topic, Opts}
!
251
                             end, Topics)).
252

253
-spec(subscribe(client(), topic(), qos() | [subopt()]) ->
254
                subscribe_ret();
255
               (client(), properties(), [{topic(), qos() | [subopt()]}]) ->
256
                subscribe_ret()).
257
subscribe(Client, Topic, QoS) when is_binary(Topic), is_atom(QoS) ->
258
    subscribe(Client, Topic, ?QOS_I(QoS));
8×
259
subscribe(Client, Topic, QoS) when is_binary(Topic), ?IS_QOS(QoS) ->
260
    subscribe(Client, Topic, [{qos, QoS}]);
17×
261
subscribe(Client, Topic, Opts) when is_binary(Topic), is_list(Opts) ->
262
    subscribe(Client, #{}, [{Topic, Opts}]);
17×
263
subscribe(Client, Properties, Topics) when is_map(Properties), is_list(Topics) ->
264
    Topics1 = [{Topic, parse_subopt(Opts)} || {Topic, Opts} <- Topics],
19×
265
    gen_statem:call(Client, {subscribe, Properties, Topics1}).
19×
266

267
-spec(subscribe(client(), properties(), topic(), qos() | [subopt()])
268
      -> subscribe_ret()).
269
subscribe(Client, Properties, Topic, QoS)
270
    when is_map(Properties), is_binary(Topic), is_atom(QoS) ->
271
    subscribe(Client, Properties, Topic, ?QOS_I(QoS));
!
272
subscribe(Client, Properties, Topic, QoS)
273
    when is_map(Properties), is_binary(Topic), ?IS_QOS(QoS) ->
274
    subscribe(Client, Properties, Topic, [{qos, QoS}]);
!
275
subscribe(Client, Properties, Topic, Opts)
276
    when is_map(Properties), is_binary(Topic), is_list(Opts) ->
277
    subscribe(Client, Properties, [{Topic, Opts}]).
!
278

279
parse_subopt(Opts) ->
280
    parse_subopt(Opts, #{rh => 0, rap => 0, nl => 0, qos => ?QOS_0}).
20×
281

282
parse_subopt([], Result) ->
283
    Result;
20×
284
parse_subopt([{rh, I} | Opts], Result) when I >= 0, I =< 2 ->
285
    parse_subopt(Opts, Result#{rh := I});
!
286
parse_subopt([{rap, true} | Opts], Result) ->
287
    parse_subopt(Opts, Result#{rap := 1});
!
288
parse_subopt([{rap, false} | Opts], Result) ->
289
    parse_subopt(Opts, Result#{rap := 0});
!
290
parse_subopt([{nl, true} | Opts], Result) ->
291
    parse_subopt(Opts, Result#{nl := 1});
!
292
parse_subopt([{nl, false} | Opts], Result) ->
293
    parse_subopt(Opts, Result#{nl := 0});
!
294
parse_subopt([{qos, QoS} | Opts], Result) ->
295
    parse_subopt(Opts, Result#{qos := ?QOS_I(QoS)}).
20×
296

297
-spec(publish(client(), topic(), payload()) -> ok | {error, term()}).
298
publish(Client, Topic, Payload) when is_binary(Topic) ->
299
    publish(Client, #mqtt_msg{topic = Topic, qos = ?QOS_0, payload = iolist_to_binary(Payload)}).
1×
300

301
-spec(publish(client(), topic(), payload(), qos() | [pubopt()])
302
        -> ok | {ok, packet_id()} | {error, term()}).
303
publish(Client, Topic, Payload, QoS) when is_binary(Topic), is_atom(QoS) ->
304
    publish(Client, Topic, Payload, [{qos, ?QOS_I(QoS)}]);
!
305
publish(Client, Topic, Payload, QoS) when is_binary(Topic), ?IS_QOS(QoS) ->
306
    publish(Client, Topic, Payload, [{qos, QoS}]);
13×
307
publish(Client, Topic, Payload, Opts) when is_binary(Topic), is_list(Opts) ->
308
    publish(Client, Topic, #{}, Payload, Opts).
16×
309

310
-spec(publish(client(), topic(), properties(), payload(), [pubopt()])
311
      -> ok | {ok, packet_id()} | {error, term()}).
312
publish(Client, Topic, Properties, Payload, Opts)
313
    when is_binary(Topic), is_map(Properties), is_list(Opts) ->
314
    ok = emqx_mqtt_props:validate(Properties),
16×
315
    Retain = proplists:get_bool(retain, Opts),
16×
316
    QoS = ?QOS_I(proplists:get_value(qos, Opts, ?QOS_0)),
16×
317
    publish(Client, #mqtt_msg{qos     = QoS,
16×
318
                              retain  = Retain,
319
                              topic   = Topic,
320
                              props   = Properties,
321
                              payload = iolist_to_binary(Payload)}).
322

323
-spec(publish(client(), #mqtt_msg{}) -> ok | {ok, packet_id()} | {error, term()}).
324
publish(Client, Msg) ->
325
    gen_statem:call(Client, {publish, Msg}).
217×
326

327
-spec(unsubscribe(client(), topic() | [topic()]) -> subscribe_ret()).
328
unsubscribe(Client, Topic) when is_binary(Topic) ->
329
    unsubscribe(Client, [Topic]);
!
330
unsubscribe(Client, Topics) when is_list(Topics) ->
331
    unsubscribe(Client, #{}, Topics).
!
332

333
-spec(unsubscribe(client(), properties(), topic() | [topic()]) -> subscribe_ret()).
334
unsubscribe(Client, Properties, Topic) when is_map(Properties), is_binary(Topic) ->
335
    unsubscribe(Client, Properties, [Topic]);
!
336
unsubscribe(Client, Properties, Topics) when is_map(Properties), is_list(Topics) ->
337
    gen_statem:call(Client, {unsubscribe, Properties, Topics}).
!
338

339
-spec(ping(client()) -> pong).
340
ping(Client) ->
341
    gen_statem:call(Client, ping).
!
342

343
-spec(disconnect(client()) -> ok).
344
disconnect(Client) ->
345
    disconnect(Client, ?RC_SUCCESS).
17×
346

347
-spec(disconnect(client(), reason_code()) -> ok).
348
disconnect(Client, ReasonCode) ->
349
    disconnect(Client, ReasonCode, #{}).
17×
350

351
-spec(disconnect(client(), reason_code(), properties()) -> ok).
352
disconnect(Client, ReasonCode, Properties) ->
353
    gen_statem:call(Client, {disconnect, ReasonCode, Properties}).
17×
354

355
%%------------------------------------------------------------------------------
356
%% For test cases
357
%%------------------------------------------------------------------------------
358

359
puback(Client, PacketId) when is_integer(PacketId) ->
360
    puback(Client, PacketId, ?RC_SUCCESS).
!
361
puback(Client, PacketId, ReasonCode)
362
    when is_integer(PacketId), is_integer(ReasonCode) ->
363
    puback(Client, PacketId, ReasonCode, #{}).
!
364
puback(Client, PacketId, ReasonCode, Properties)
365
    when is_integer(PacketId), is_integer(ReasonCode), is_map(Properties) ->
366
    gen_statem:cast(Client, {puback, PacketId, ReasonCode, Properties}).
!
367

368
pubrec(Client, PacketId) when is_integer(PacketId) ->
369
    pubrec(Client, PacketId, ?RC_SUCCESS).
!
370
pubrec(Client, PacketId, ReasonCode)
371
    when is_integer(PacketId), is_integer(ReasonCode) ->
372
    pubrec(Client, PacketId, ReasonCode, #{}).
!
373
pubrec(Client, PacketId, ReasonCode, Properties)
374
    when is_integer(PacketId), is_integer(ReasonCode), is_map(Properties) ->
375
    gen_statem:cast(Client, {pubrec, PacketId, ReasonCode, Properties}).
!
376

377
pubrel(Client, PacketId) when is_integer(PacketId) ->
378
    pubrel(Client, PacketId, ?RC_SUCCESS).
!
379
pubrel(Client, PacketId, ReasonCode)
380
    when is_integer(PacketId), is_integer(ReasonCode) ->
381
    pubrel(Client, PacketId, ReasonCode, #{}).
!
382
pubrel(Client, PacketId, ReasonCode, Properties)
383
    when is_integer(PacketId), is_integer(ReasonCode), is_map(Properties) ->
384
    gen_statem:cast(Client, {pubrel, PacketId, ReasonCode, Properties}).
!
385

386
pubcomp(Client, PacketId) when is_integer(PacketId) ->
387
    pubcomp(Client, PacketId, ?RC_SUCCESS).
!
388
pubcomp(Client, PacketId, ReasonCode)
389
    when is_integer(PacketId), is_integer(ReasonCode) ->
390
    pubcomp(Client, PacketId, ReasonCode, #{}).
!
391
pubcomp(Client, PacketId, ReasonCode, Properties)
392
    when is_integer(PacketId), is_integer(ReasonCode), is_map(Properties) ->
393
    gen_statem:cast(Client, {pubcomp, PacketId, ReasonCode, Properties}).
!
394

395
subscriptions(Client) ->
396
    gen_statem:call(Client, subscriptions).
!
397

398
info(Client) ->
399
    gen_statem:call(Client, info).
!
400

401
stop(Client) ->
402
    gen_statem:call(Client, stop).
9×
403

404
pause(Client) ->
405
    gen_statem:call(Client, pause).
1×
406

407
resume(Client) ->
408
    gen_statem:call(Client, resume).
!
409

410
%%------------------------------------------------------------------------------
411
%% gen_statem callbacks
412
%%------------------------------------------------------------------------------
413

414
init([Options]) ->
415
    process_flag(trap_exit, true),
34×
416
    ClientId = case {proplists:get_value(proto_ver, Options, v4),
34×
417
                     proplists:get_value(client_id, Options)} of
418
                   {v5, undefined}   -> ?NO_CLIENT_ID;
!
419
                   {_ver, undefined} -> random_client_id();
19×
420
                   {_ver, Id}        -> iolist_to_binary(Id)
15×
421
               end,
422
    State = init(Options, #state{host            = {127,0,0,1},
34×
423
                                 port            = 1883,
424
                                 hosts           = [],
425
                                 sock_opts       = [],
426
                                 bridge_mode     = false,
427
                                 client_id       = ClientId,
428
                                 clean_start     = true,
429
                                 proto_ver       = ?MQTT_PROTO_V4,
430
                                 proto_name      = <<"MQTT">>,
431
                                 keepalive       = ?DEFAULT_KEEPALIVE,
432
                                 force_ping      = false,
433
                                 paused          = false,
434
                                 will_flag       = false,
435
                                 will_msg        = #mqtt_msg{},
436
                                 pending_calls   = [],
437
                                 subscriptions   = #{},
438
                                 max_inflight    = infinity,
439
                                 inflight        = emqx_inflight:new(0),
440
                                 awaiting_rel    = #{},
441
                                 properties      = #{},
442
                                 auto_ack        = true,
443
                                 ack_timeout     = ?DEFAULT_ACK_TIMEOUT,
444
                                 retry_interval  = 0,
445
                                 connect_timeout = ?DEFAULT_CONNECT_TIMEOUT,
446
                                 last_packet_id  = 1}),
447
    {ok, initialized, init_parse_state(State)}.
34×
448

449
random_client_id() ->
450
    rand:seed(exsplus, erlang:timestamp()),
19×
451
    I1 = rand:uniform(round(math:pow(2, 48))) - 1,
19×
452
    I2 = rand:uniform(round(math:pow(2, 32))) - 1,
19×
453
    {ok, Host} = inet:gethostname(),
19×
454
    iolist_to_binary(["emqx-client-", Host, "-", io_lib:format("~12.16.0b~8.16.0b", [I1, I2])]).
19×
455

456
init([], State) ->
457
    State;
34×
458
init([{name, Name} | Opts], State) ->
459
    init(Opts, State#state{name = Name});
!
460
init([{owner, Owner} | Opts], State) when is_pid(Owner) ->
461
    link(Owner),
34×
462
    init(Opts, State#state{owner = Owner});
34×
463
init([{msg_handler, Hdlr} | Opts], State) ->
464
    init(Opts, State#state{msg_handler = Hdlr});
1×
465
init([{host, Host} | Opts], State) ->
466
    init(Opts, State#state{host = Host});
8×
467
init([{port, Port} | Opts], State) ->
468
    init(Opts, State#state{port = Port});
1×
469
init([{hosts, Hosts} | Opts], State) ->
470
    Hosts1 =
!
471
    lists:foldl(fun({Host, Port}, Acc) ->
472
                    [{Host, Port}|Acc];
!
473
                   (Host, Acc) ->
474
                    [{Host, 1883}|Acc]
!
475
                end, [], Hosts),
476
    init(Opts, State#state{hosts = Hosts1});
!
477
init([{tcp_opts, TcpOpts} | Opts], State = #state{sock_opts = SockOpts}) ->
478
    init(Opts, State#state{sock_opts = emqx_misc:merge_opts(SockOpts, TcpOpts)});
!
479
init([{ssl, EnableSsl} | Opts], State) ->
480
    case lists:keytake(ssl_opts, 1, Opts) of
1×
481
        {value, SslOpts, WithOutSslOpts} ->
482
            init([SslOpts, {ssl, EnableSsl}| WithOutSslOpts], State);
!
483
        false ->
484
            init([{ssl_opts, []}, {ssl, EnableSsl}| Opts], State)
1×
485
    end;
486
init([{ssl_opts, SslOpts} | Opts], State = #state{sock_opts = SockOpts}) ->
487
    case lists:keytake(ssl, 1, Opts) of
1×
488
        {value, {ssl, true}, WithOutEnableSsl} ->
489
            ok = ssl:start(),
!
490
            SockOpts1 = emqx_misc:merge_opts(SockOpts, [{ssl_opts, SslOpts}]),
!
491
            init(WithOutEnableSsl, State#state{sock_opts = SockOpts1});
!
492
        {value, {ssl, false}, WithOutEnableSsl} ->
493
            init(WithOutEnableSsl, State);
1×
494
        false ->
495
            init(Opts, State)
!
496
    end;
497
init([{client_id, ClientId} | Opts], State) ->
498
    init(Opts, State#state{client_id = iolist_to_binary(ClientId)});
15×
499
init([{clean_start, CleanStart} | Opts], State) when is_boolean(CleanStart) ->
500
    init(Opts, State#state{clean_start = CleanStart});
8×
501
init([{username, Username} | Opts], State) ->
502
    init(Opts, State#state{username = iolist_to_binary(Username)});
11×
503
init([{password, Password} | Opts], State) ->
504
    init(Opts, State#state{password = iolist_to_binary(Password)});
3×
505
init([{keepalive, Secs} | Opts], State) ->
506
    init(Opts, State#state{keepalive = Secs});
3×
507
init([{proto_ver, v3} | Opts], State) ->
508
    init(Opts, State#state{proto_ver  = ?MQTT_PROTO_V3,
!
509
                           proto_name = <<"MQIsdp">>});
510
init([{proto_ver, v4} | Opts], State) ->
511
    init(Opts, State#state{proto_ver  = ?MQTT_PROTO_V4,
!
512
                           proto_name = <<"MQTT">>});
513
init([{proto_ver, v5} | Opts], State) ->
514
    init(Opts, State#state{proto_ver  = ?MQTT_PROTO_V5,
!
515
                           proto_name = <<"MQTT">>});
516
init([{will_topic, Topic} | Opts], State = #state{will_msg = WillMsg}) ->
517
    WillMsg1 = init_will_msg({topic, Topic}, WillMsg),
3×
518
    init(Opts, State#state{will_flag = true, will_msg = WillMsg1});
3×
519
init([{will_props, Properties} | Opts], State = #state{will_msg = WillMsg}) ->
520
    init(Opts, State#state{will_msg = init_will_msg({props, Properties}, WillMsg)});
!
521
init([{will_payload, Payload} | Opts], State = #state{will_msg = WillMsg}) ->
522
    init(Opts, State#state{will_msg = init_will_msg({payload, Payload}, WillMsg)});
3×
523
init([{will_retain, Retain} | Opts], State = #state{will_msg = WillMsg}) ->
524
    init(Opts, State#state{will_msg = init_will_msg({retain, Retain}, WillMsg)});
!
525
init([{will_qos, QoS} | Opts], State = #state{will_msg = WillMsg}) ->
526
    init(Opts, State#state{will_msg = init_will_msg({qos, QoS}, WillMsg)});
2×
527
init([{connect_timeout, Timeout}| Opts], State) ->
528
    init(Opts, State#state{connect_timeout = timer:seconds(Timeout)});
!
529
init([{ack_timeout, Timeout}| Opts], State) ->
530
    init(Opts, State#state{ack_timeout = timer:seconds(Timeout)});
!
531
init([force_ping | Opts], State) ->
532
    init(Opts, State#state{force_ping = true});
!
533
init([{force_ping, ForcePing} | Opts], State) when is_boolean(ForcePing) ->
534
    init(Opts, State#state{force_ping = ForcePing});
!
535
init([{properties, Properties} | Opts], State = #state{properties = InitProps}) ->
536
    init(Opts, State#state{properties = maps:merge(InitProps, Properties)});
!
537
init([{max_inflight, infinity} | Opts], State) ->
538
    init(Opts, State#state{max_inflight = infinity,
!
539
                           inflight     = emqx_inflight:new(0)});
540
init([{max_inflight, I} | Opts], State) when is_integer(I) ->
541
    init(Opts, State#state{max_inflight = I,
1×
542
                           inflight     = emqx_inflight:new(I)});
543
init([auto_ack | Opts], State) ->
544
    init(Opts, State#state{auto_ack = true});
!
545
init([{auto_ack, AutoAck} | Opts], State) when is_boolean(AutoAck) ->
546
    init(Opts, State#state{auto_ack = AutoAck});
!
547
init([{retry_interval, I} | Opts], State) ->
548
    init(Opts, State#state{retry_interval = timer:seconds(I)});
!
549
init([{bridge_mode, Mode} | Opts], State) when is_boolean(Mode) ->
550
    init(Opts, State#state{bridge_mode = Mode});
1×
551
init([_Opt | Opts], State) ->
552
    init(Opts, State).
6×
553

554
init_will_msg({topic, Topic}, WillMsg) ->
555
    WillMsg#mqtt_msg{topic = iolist_to_binary(Topic)};
3×
556
init_will_msg({props, Props}, WillMsg) ->
557
    WillMsg#mqtt_msg{props = Props};
!
558
init_will_msg({payload, Payload}, WillMsg) ->
559
    WillMsg#mqtt_msg{payload = iolist_to_binary(Payload)};
3×
560
init_will_msg({retain, Retain}, WillMsg) when is_boolean(Retain) ->
561
    WillMsg#mqtt_msg{retain = Retain};
!
562
init_will_msg({qos, QoS}, WillMsg) ->
563
    WillMsg#mqtt_msg{qos = ?QOS_I(QoS)}.
2×
564

565
init_parse_state(State = #state{proto_ver = Ver, properties = Properties}) ->
566
    Size = maps:get('Maximum-Packet-Size', Properties, ?MAX_PACKET_SIZE),
532×
567
    State#state{parse_state = emqx_frame:initial_state(
532×
568
                                #{max_packet_size => Size, version => Ver})}.
569

570
callback_mode() -> state_functions.
34×
571

572
initialized({call, From}, connect, State = #state{sock_opts       = SockOpts,
573
                                                  connect_timeout = Timeout}) ->
574
    case sock_connect(hosts(State), SockOpts, Timeout) of
34×
575
        {ok, Sock} ->
576
            case mqtt_connect(run_sock(State#state{socket = Sock})) of
34×
577
                {ok, NewState} ->
578
                    {next_state, waiting_for_connack,
34×
579
                     add_call(new_call(connect, From), NewState), [Timeout]};
580
                Error = {error, Reason} ->
581
                    {stop_and_reply, Reason, [{reply, From, Error}]}
!
582
            end;
583
        Error = {error, Reason} ->
584
            {stop_and_reply, Reason, [{reply, From, Error}]}
!
585
    end;
586

587
initialized(EventType, EventContent, State) ->
588
    handle_event(EventType, EventContent, initialized, State).
!
589

590
mqtt_connect(State = #state{client_id   = ClientId,
591
                            clean_start = CleanStart,
592
                            bridge_mode = IsBridge,
593
                            username    = Username,
594
                            password    = Password,
595
                            proto_ver   = ProtoVer,
596
                            proto_name  = ProtoName,
597
                            keepalive   = KeepAlive,
598
                            will_flag   = WillFlag,
599
                            will_msg    = WillMsg,
600
                            properties  = Properties}) ->
601
    ?WILL_MSG(WillQoS, WillRetain, WillTopic, WillProps, WillPayload) = WillMsg,
34×
602
    ConnProps = emqx_mqtt_props:filter(?CONNECT, Properties),
34×
603
    send(?CONNECT_PACKET(
34×
604
            #mqtt_packet_connect{proto_ver    = ProtoVer,
605
                                 proto_name   = ProtoName,
606
                                 is_bridge    = IsBridge,
607
                                 clean_start  = CleanStart,
608
                                 will_flag    = WillFlag,
609
                                 will_qos     = WillQoS,
610
                                 will_retain  = WillRetain,
611
                                 keepalive    = KeepAlive,
612
                                 properties   = ConnProps,
613
                                 client_id    = ClientId,
614
                                 will_props   = WillProps,
615
                                 will_topic   = WillTopic,
616
                                 will_payload = WillPayload,
617
                                 username     = Username,
618
                                 password     = Password}), State).
619

620
waiting_for_connack(cast, ?CONNACK_PACKET(?RC_SUCCESS,
621
                                          SessPresent,
622
                                          Properties),
623
                    State = #state{properties = AllProps,
624
                                   client_id = ClientId}) ->
625
    case take_call(connect, State) of
32×
626
        {value, #call{from = From}, State1} ->
627
            AllProps1 = case Properties of
32×
628
                            undefined -> AllProps;
32×
629
                            _ -> maps:merge(AllProps, Properties)
!
630
                        end,
631
            Reply = {ok, Properties},
32×
632
            State2 = State1#state{client_id = assign_id(ClientId, AllProps1),
32×
633
                                  properties = AllProps1,
634
                                  session_present = SessPresent},
635
            {next_state, connected, ensure_keepalive_timer(State2),
32×
636
             [{reply, From, Reply}]};
637
        false ->
638
            {stop, bad_connack}
!
639
    end;
640

641
waiting_for_connack(cast, ?CONNACK_PACKET(ReasonCode,
642
                                          _SessPresent,
643
                                          Properties),
644
                    State = #state{proto_ver = ProtoVer}) ->
645
    Reason = emqx_reason_codes:name(ReasonCode, ProtoVer),
2×
646
    case take_call(connect, State) of
2×
647
        {value, #call{from = From}, _State} ->
648
            Reply = {error, {Reason, Properties}},
2×
649
            {stop_and_reply, {shutdown, Reason}, [{reply, From, Reply}]};
2×
650
        false -> {stop, connack_error}
!
651
    end;
652

653
waiting_for_connack(timeout, _Timeout, State) ->
654
    case take_call(connect, State) of
!
655
        {value, #call{from = From}, _State} ->
656
            Reply = {error, connack_timeout},
!
657
            {stop_and_reply, connack_timeout, [{reply, From, Reply}]};
!
658
        false -> {stop, connack_timeout}
!
659
    end;
660

661
waiting_for_connack(EventType, EventContent, State) ->
662
    case take_call(connect, State) of
68×
663
        {value, #call{from = From}, _State} ->
664
            case handle_event(EventType, EventContent, waiting_for_connack, State) of
68×
665
                {stop, Reason, State} ->
666
                    Reply = {error, {Reason, EventContent}},
!
667
                    {stop_and_reply, Reason, [{reply, From, Reply}]};
!
668
                StateCallbackResult ->
669
                    StateCallbackResult
68×
670
            end;
671
        false -> {stop, connack_timeout}
!
672
    end.
673

674
connected({call, From}, subscriptions, #state{subscriptions = Subscriptions}) ->
675
    {keep_state_and_data, [{reply, From, maps:to_list(Subscriptions)}]};
!
676

677
connected({call, From}, info, State) ->
678
    Info = lists:zip(record_info(fields, state), tl(tuple_to_list(State))),
!
679
    {keep_state_and_data, [{reply, From, Info}]};
!
680

681
connected({call, From}, pause, State) ->
682
    {keep_state, State#state{paused = true}, [{reply, From, ok}]};
1×
683

684
connected({call, From}, resume, State) ->
685
    {keep_state, State#state{paused = false}, [{reply, From, ok}]};
!
686

687
connected({call, From}, client_id, #state{client_id = ClientId}) ->
688
    {keep_state_and_data, [{reply, From, ClientId}]};
!
689

690
connected({call, From}, SubReq = {subscribe, Properties, Topics},
691
          State = #state{last_packet_id = PacketId, subscriptions = Subscriptions}) ->
692
    case send(?SUBSCRIBE_PACKET(PacketId, Properties, Topics), State) of
19×
693
        {ok, NewState} ->
694
            Call = new_call({subscribe, PacketId}, From, SubReq),
19×
695
            Subscriptions1 =
19×
696
                lists:foldl(fun({Topic, Opts}, Acc) ->
697
                                maps:put(Topic, Opts, Acc)
20×
698
                            end, Subscriptions, Topics),
699
            {keep_state, ensure_ack_timer(add_call(Call,NewState#state{subscriptions = Subscriptions1}))};
19×
700
        Error = {error, Reason} ->
701
            {stop_and_reply, Reason, [{reply, From, Error}]}
!
702
    end;
703

704
connected({call, From}, {publish, Msg = #mqtt_msg{qos = ?QOS_0}}, State) ->
705
    case send(Msg, State) of
4×
706
        {ok, NewState} ->
707
            {keep_state, NewState, [{reply, From, ok}]};
4×
708
        Error = {error, Reason} ->
709
            {stop_and_reply, Reason, [{reply, From, Error}]}
!
710
    end;
711

712
connected({call, From}, {publish, Msg = #mqtt_msg{qos = QoS}},
713
          State = #state{inflight = Inflight, last_packet_id = PacketId})
714
    when (QoS =:= ?QOS_1); (QoS =:= ?QOS_2) ->
715
    Msg1 = Msg#mqtt_msg{packet_id = PacketId},
213×
716
    case send(Msg1, State) of
213×
717
        {ok, NewState} ->
718
            Inflight1 = emqx_inflight:insert(PacketId, {publish, Msg1, os:timestamp()}, Inflight),
213×
719
            State1 = ensure_retry_timer(NewState#state{inflight = Inflight1}),
213×
720
            Actions = [{reply, From, {ok, PacketId}}],
213×
721
            case emqx_inflight:is_full(Inflight1) of
213×
UNCOV
722
                true -> {next_state, inflight_full, State1, Actions};
!
723
                false -> {keep_state, State1, Actions}
213×
724
            end;
725
        {error, Reason} ->
726
            {stop_and_reply, Reason, [{reply, From, {error, {PacketId, Reason}}}]}
!
727
    end;
728

729
connected({call, From}, UnsubReq = {unsubscribe, Properties, Topics},
730
          State = #state{last_packet_id = PacketId}) ->
731
    case send(?UNSUBSCRIBE_PACKET(PacketId, Properties, Topics), State) of
!
732
        {ok, NewState} ->
733
            Call = new_call({unsubscribe, PacketId}, From, UnsubReq),
!
734
            {keep_state, ensure_ack_timer(add_call(Call, NewState))};
!
735
        Error = {error, Reason} ->
736
            {stop_and_reply, Reason, [{reply, From, Error}]}
!
737
    end;
738

739
connected({call, From}, ping, State) ->
740
    case send(?PACKET(?PINGREQ), State) of
!
741
        {ok, NewState} ->
742
            Call = new_call(ping, From),
!
743
            {keep_state, ensure_ack_timer(add_call(Call, NewState))};
!
744
        Error = {error, Reason} ->
745
            {stop_and_reply, Reason, [{reply, From, Error}]}
!
746
    end;
747

748
connected({call, From}, {disconnect, ReasonCode, Properties}, State) ->
749
    case send(?DISCONNECT_PACKET(ReasonCode, Properties), State) of
17×
750
        {ok, NewState} ->
751
            {stop_and_reply, normal, [{reply, From, ok}], NewState};
17×
752
        Error = {error, Reason} ->
753
            {stop_and_reply, Reason, [{reply, From, Error}]}
!
754
    end;
755

756
connected(cast, {puback, PacketId, ReasonCode, Properties}, State) ->
757
    send_puback(?PUBACK_PACKET(PacketId, ReasonCode, Properties), State);
!
758

759
connected(cast, {pubrec, PacketId, ReasonCode, Properties}, State) ->
760
    send_puback(?PUBREC_PACKET(PacketId, ReasonCode, Properties), State);
!
761

762
connected(cast, {pubrel, PacketId, ReasonCode, Properties}, State) ->
763
    send_puback(?PUBREL_PACKET(PacketId, ReasonCode, Properties), State);
!
764

765
connected(cast, {pubcomp, PacketId, ReasonCode, Properties}, State) ->
766
    send_puback(?PUBCOMP_PACKET(PacketId, ReasonCode, Properties), State);
!
767

768
connected(cast, ?PUBLISH_PACKET(_QoS, _PacketId), #state{paused = true}) ->
769
    keep_state_and_data;
2×
770

771
connected(cast, Packet = ?PUBLISH_PACKET(?QOS_0, _PacketId), State) ->
772
     {keep_state, deliver(packet_to_msg(Packet), State)};
8×
773

774
connected(cast, Packet = ?PUBLISH_PACKET(?QOS_1, _PacketId), State) ->
775
    publish_process(?QOS_1, Packet, State);
203×
776

777
connected(cast, Packet = ?PUBLISH_PACKET(?QOS_2, _PacketId), State) ->
778
    publish_process(?QOS_2, Packet, State);
6×
779

780
connected(cast, ?PUBACK_PACKET(_PacketId, _ReasonCode, _Properties) = PubAck, State) ->
781
    {keep_state, delete_inflight(PubAck, State)};
205×
782

783
connected(cast, ?PUBREC_PACKET(PacketId), State = #state{inflight = Inflight}) ->
784
    send_puback(?PUBREL_PACKET(PacketId),
8×
785
                case emqx_inflight:lookup(PacketId, Inflight) of
786
                    {value, {publish, _Msg, _Ts}} ->
787
                        Inflight1 = emqx_inflight:update(PacketId, {pubrel, PacketId, os:timestamp()}, Inflight),
8×
788
                        State#state{inflight = Inflight1};
8×
789
                    {value, {pubrel, _Ref, _Ts}} ->
790
                        ?LOG(notice, "[Client] Duplicated PUBREC Packet: ~p", [PacketId]),
!
791
                        State;
!
792
                    none ->
793
                        ?LOG(warning, "[Client] Unexpected PUBREC Packet: ~p", [PacketId]),
!
794
                        State
!
795
                end);
796

797
%%TODO::... if auto_ack is false, should we take PacketId from the map?
798
connected(cast, ?PUBREL_PACKET(PacketId),
799
          State = #state{awaiting_rel = AwaitingRel, auto_ack = AutoAck}) ->
800
     case maps:take(PacketId, AwaitingRel) of
6×
801
         {Packet, AwaitingRel1} ->
802
             NewState = deliver(packet_to_msg(Packet), State#state{awaiting_rel = AwaitingRel1}),
6×
803
             case AutoAck of
6×
804
                 true  -> send_puback(?PUBCOMP_PACKET(PacketId), NewState);
6×
805
                 false -> {keep_state, NewState}
!
806
             end;
807
         error ->
808
             ?LOG(warning, "[Client] Unexpected PUBREL: ~p", [PacketId]),
!
809
             keep_state_and_data
!
810
     end;
811

812
connected(cast, ?PUBCOMP_PACKET(_PacketId, _ReasonCode, _Properties) = PubComp, State) ->
813
    {keep_state, delete_inflight(PubComp, State)};
7×
814

815
connected(cast, ?SUBACK_PACKET(PacketId, Properties, ReasonCodes),
816
          State = #state{subscriptions = _Subscriptions}) ->
817
    case take_call({subscribe, PacketId}, State) of
19×
818
        {value, #call{from = From}, NewState} ->
819
            %%TODO: Merge reason codes to subscriptions?
820
            Reply = {ok, Properties, ReasonCodes},
19×
821
            {keep_state, NewState, [{reply, From, Reply}]};
19×
822
        false ->
823
            keep_state_and_data
!
824
    end;
825

826
connected(cast, ?UNSUBACK_PACKET(PacketId, Properties, ReasonCodes),
827
          State = #state{subscriptions = Subscriptions}) ->
828
    case take_call({unsubscribe, PacketId}, State) of
!
829
        {value, #call{from = From, req = {_, _, Topics}}, NewState} ->
830
            Subscriptions1 =
!
831
              lists:foldl(fun(Topic, Acc) ->
832
                              maps:remove(Topic, Acc)
!
833
                          end, Subscriptions, Topics),
834
            {keep_state, NewState#state{subscriptions = Subscriptions1},
!
835
             [{reply, From, {ok, Properties, ReasonCodes}}]};
836
        false ->
837
            keep_state_and_data
!
838
    end;
839

840
connected(cast, ?PACKET(?PINGRESP), #state{pending_calls = []}) ->
841
    keep_state_and_data;
!
842
connected(cast, ?PACKET(?PINGRESP), State) ->
843
    case take_call(ping, State) of
!
844
        {value, #call{from = From}, NewState} ->
845
            {keep_state, NewState, [{reply, From, pong}]};
!
846
        false ->
847
            keep_state_and_data
!
848
    end;
849

850
connected(cast, ?DISCONNECT_PACKET(ReasonCode, Properties), State) ->
851
    {stop, {disconnected, ReasonCode, Properties}, State};
!
852

853
connected(info, {timeout, _TRef, keepalive}, State = #state{force_ping = true}) ->
854
    case send(?PACKET(?PINGREQ), State) of
!
855
        {ok, NewState} ->
856
            {keep_state, ensure_keepalive_timer(NewState)};
!
857
        Error -> {stop, Error}
!
858
    end;
859

860
connected(info, {timeout, TRef, keepalive},
861
          State = #state{socket = Sock, paused = Paused, keepalive_timer = TRef}) ->
862
    case (not Paused) andalso should_ping(Sock) of
!
863
        true ->
864
            case send(?PACKET(?PINGREQ), State) of
!
865
                {ok, NewState} ->
866
                    {keep_state, ensure_keepalive_timer(NewState), [hibernate]};
!
867
                Error -> {stop, Error}
!
868
            end;
869
        false ->
870
            {keep_state, ensure_keepalive_timer(State), [hibernate]};
!
871
        {error, Reason} ->
872
            {stop, Reason}
!
873
    end;
874

875
connected(info, {timeout, TRef, ack}, State = #state{ack_timer     = TRef,
876
                                                     ack_timeout   = Timeout,
877
                                                     pending_calls = Calls}) ->
878
    NewState = State#state{ack_timer = undefined,
!
879
                           pending_calls = timeout_calls(Timeout, Calls)},
880
    {keep_state, ensure_ack_timer(NewState)};
!
881

882
connected(info, {timeout, TRef, retry}, State = #state{retry_timer = TRef,
883
                                                       inflight    = Inflight}) ->
884
    case emqx_inflight:is_empty(Inflight) of
!
885
        true  -> {keep_state, State#state{retry_timer = undefined}};
!
886
        false -> retry_send(State)
!
887
    end;
888

889
connected(EventType, EventContent, Data) ->
890
    handle_event(EventType, EventContent, connected, Data).
821×
891

892
inflight_full({call, _From}, {publish, #mqtt_msg{qos = QoS}}, _State) when (QoS =:= ?QOS_1); (QoS =:= ?QOS_2) ->
UNCOV
893
    {keep_state_and_data, [postpone]};
!
894
inflight_full(cast, ?PUBACK_PACKET(_PacketId, _ReasonCode, _Properties) = PubAck, State) ->
UNCOV
895
    delete_inflight_when_full(PubAck, State);
!
896
inflight_full(cast, ?PUBCOMP_PACKET(_PacketId, _ReasonCode, _Properties) = PubComp, State) ->
897
    delete_inflight_when_full(PubComp, State);
!
898
inflight_full(EventType, EventContent, Data) ->
899
    %% inflight_full is a sub-state of connected state,
900
    %% delegate all other events to connected state.
UNCOV
901
    connected(EventType, EventContent, Data).
!
902

903
handle_event({call, From}, stop, _StateName, _State) ->
904
    {stop_and_reply, normal, [{reply, From, ok}]};
9×
905
handle_event(info, {TcpOrSsL, _Sock, Data}, _StateName, State)
906
    when TcpOrSsL =:= tcp; TcpOrSsL =:= ssl ->
907
    ?LOG(debug, "[Client] RECV Data: ~p", [Data]),
384×
908
    process_incoming(Data, [], run_sock(State));
384×
909

910
handle_event(info, {Error, _Sock, Reason}, _StateName, State)
911
    when Error =:= tcp_error; Error =:= ssl_error ->
912
    ?LOG(error, "[Client] The connection error occured ~p, reason:~p", [Error, Reason]),
!
913
    {stop, {shutdown, Reason}, State};
!
914

915
handle_event(info, {Closed, _Sock}, _StateName, State)
916
    when Closed =:= tcp_closed; Closed =:= ssl_closed ->
917
    ?LOG(debug, "[Client] ~p", [Closed]),
5×
918
    {stop, {shutdown, Closed}, State};
5×
919

920
handle_event(info, {'EXIT', Owner, Reason}, _, State = #state{owner = Owner}) ->
921
    ?LOG(debug, "[Client] Got EXIT from owner, Reason: ~p", [Reason]),
!
922
    {stop, {shutdown, Reason}, State};
!
923

924
handle_event(info, {inet_reply, _Sock, ok}, _, _State) ->
925
    keep_state_and_data;
491×
926

927
handle_event(info, {inet_reply, _Sock, {error, Reason}}, _, State) ->
928
    ?LOG(error, "[Client] Got tcp error: ~p", [Reason]),
!
929
    {stop, {shutdown, Reason}, State};
!
930

931
handle_event(info, EventContent = {'EXIT', _Pid, normal}, StateName, _State) ->
932
    ?LOG(info, "[Client] State: ~s, Unexpected Event: (info, ~p)",
!
933
         [StateName, EventContent]),
934
    keep_state_and_data;
!
935

936
handle_event(EventType, EventContent, StateName, _StateData) ->
937
    ?LOG(error, "[Client] State: ~s, Unexpected Event: (~p, ~p)",
!
938
         [StateName, EventType, EventContent]),
939
    keep_state_and_data.
!
940

941
%% Mandatory callback functions
942
terminate(Reason, _StateName, State = #state{socket = Socket}) ->
943
    case Reason of
34×
944
        {disconnected, ReasonCode, Properties} ->
945
            %% backward compatible
946
            ok = eval_msg_handler(State, disconnected, {ReasonCode, Properties});
!
947
        _ ->
948
            ok = eval_msg_handler(State, disconnected, Reason)
34×
949
    end,
950
    case Socket =:= undefined of
34×
951
        true -> ok;
!
952
        _ -> emqx_client_sock:close(Socket)
34×
953
    end.
954

955
code_change(_Vsn, State, Data, _Extra) ->
956
    {ok, State, Data}.
!
957

958
%%------------------------------------------------------------------------------
959
%% Internal functions
960
%%------------------------------------------------------------------------------
961

962
should_ping(Sock) ->
963
    case emqx_client_sock:getstat(Sock, [send_oct]) of
!
964
        {ok, [{send_oct, Val}]} ->
965
            OldVal = get(send_oct), put(send_oct, Val),
!
966
            OldVal == undefined orelse OldVal == Val;
!
967
        Error = {error, _Reason} ->
968
            Error
!
969
    end.
970

971
delete_inflight(?PUBACK_PACKET(PacketId, ReasonCode, Properties),
972
                State = #state{inflight = Inflight}) ->
973
    case emqx_inflight:lookup(PacketId, Inflight) of
205×
974
        {value, {publish, #mqtt_msg{packet_id = PacketId}, _Ts}} ->
975
            ok = eval_msg_handler(State, puback, #{packet_id   => PacketId,
205×
976
                                                   reason_code => ReasonCode,
977
                                                   properties  => Properties}),
978
            State#state{inflight = emqx_inflight:delete(PacketId, Inflight)};
205×
979
        none ->
980
            ?LOG(warning, "[Client] Unexpected PUBACK: ~p", [PacketId]),
!
981
            State
!
982
    end;
983
delete_inflight(?PUBCOMP_PACKET(PacketId, ReasonCode, Properties),
984
                State = #state{inflight = Inflight}) ->
985
    case emqx_inflight:lookup(PacketId, Inflight) of
7×
986
        {value, {pubrel, _PacketId, _Ts}} ->
987
            ok = eval_msg_handler(State, puback, #{packet_id   => PacketId,
7×
988
                                                   reason_code => ReasonCode,
989
                                                   properties  => Properties}),
990
            State#state{inflight = emqx_inflight:delete(PacketId, Inflight)};
7×
991
        none ->
992
            ?LOG(warning, "[Client] Unexpected PUBCOMP Packet: ~p", [PacketId]),
!
993
            State
!
994
     end.
995

996
delete_inflight_when_full(Packet, State0) ->
UNCOV
997
    State = #state{inflight = Inflight} = delete_inflight(Packet, State0),
!
UNCOV
998
    case emqx_inflight:is_full(Inflight) of
!
999
        true -> {keep_state, State};
!
UNCOV
1000
        false -> {next_state, connected, State}
!
1001
    end.
1002

1003
assign_id(?NO_CLIENT_ID, Props) ->
1004
    case maps:find('Assigned-Client-Identifier', Props) of
!
1005
        {ok, Value} ->
1006
            Value;
!
1007
        _ ->
1008
            error(bad_client_id)
!
1009
    end;
1010
assign_id(Id, _Props) ->
1011
    Id.
32×
1012

1013
publish_process(?QOS_1, Packet = ?PUBLISH_PACKET(?QOS_1, PacketId), State0 = #state{auto_ack = AutoAck}) ->
1014
    State = deliver(packet_to_msg(Packet), State0),
203×
1015
    case AutoAck of
203×
1016
        true  -> send_puback(?PUBACK_PACKET(PacketId), State);
203×
1017
        false -> {keep_state, State}
!
1018
    end;
1019
publish_process(?QOS_2, Packet = ?PUBLISH_PACKET(?QOS_2, PacketId),
1020
    State = #state{awaiting_rel = AwaitingRel}) ->
1021
    case send_puback(?PUBREC_PACKET(PacketId), State) of
6×
1022
        {keep_state, NewState} ->
1023
            AwaitingRel1 = maps:put(PacketId, Packet, AwaitingRel),
6×
1024
            {keep_state, NewState#state{awaiting_rel = AwaitingRel1}};
6×
1025
        Stop -> Stop
!
1026
    end.
1027

1028
ensure_keepalive_timer(State = ?PROPERTY('Server-Keep-Alive', Secs)) ->
1029
    ensure_keepalive_timer(timer:seconds(Secs), State#state{keepalive = Secs});
!
1030
ensure_keepalive_timer(State = #state{keepalive = 0}) ->
1031
    State;
1×
1032
ensure_keepalive_timer(State = #state{keepalive = I}) ->
1033
    ensure_keepalive_timer(timer:seconds(I), State).
31×
1034
ensure_keepalive_timer(I, State) when is_integer(I) ->
1035
    State#state{keepalive_timer = erlang:start_timer(I, self(), keepalive)}.
31×
1036

1037
new_call(Id, From) ->
1038
    new_call(Id, From, undefined).
34×
1039
new_call(Id, From, Req) ->
1040
    #call{id = Id, from = From, req = Req, ts = os:timestamp()}.
53×
1041

1042
add_call(Call, Data = #state{pending_calls = Calls}) ->
1043
    Data#state{pending_calls = [Call | Calls]}.
53×
1044

1045
take_call(Id, Data = #state{pending_calls = Calls}) ->
1046
    case lists:keytake(Id, #call.id, Calls) of
121×
1047
        {value, Call, Left} ->
1048
            {value, Call, Data#state{pending_calls = Left}};
121×
1049
        false -> false
!
1050
    end.
1051

1052
timeout_calls(Timeout, Calls) ->
1053
    timeout_calls(os:timestamp(), Timeout, Calls).
!
1054
timeout_calls(Now, Timeout, Calls) ->
1055
    lists:foldl(fun(C = #call{from = From, ts = Ts}, Acc) ->
!
1056
                    case (timer:now_diff(Now, Ts) div 1000) >= Timeout of
!
1057
                        true  -> From ! {error, ack_timeout},
!
1058
                                 Acc;
!
1059
                        false -> [C | Acc]
!
1060
                    end
1061
                end, [], Calls).
1062

1063
ensure_ack_timer(State = #state{ack_timer     = undefined,
1064
                                ack_timeout   = Timeout,
1065
                                pending_calls = Calls}) when length(Calls) > 0 ->
1066
    State#state{ack_timer = erlang:start_timer(Timeout, self(), ack)};
17×
1067
ensure_ack_timer(State) -> State.
2×
1068

1069
ensure_retry_timer(State = #state{retry_interval = Interval}) ->
1070
    do_ensure_retry_timer(Interval, State).
213×
1071

1072
do_ensure_retry_timer(Interval, State = #state{retry_timer = undefined})
1073
    when Interval > 0 ->
1074
    State#state{retry_timer = erlang:start_timer(Interval, self(), retry)};
!
1075
do_ensure_retry_timer(_Interval, State) ->
1076
    State.
213×
1077

1078
retry_send(State = #state{inflight = Inflight}) ->
1079
    SortFun = fun({_, _, Ts1}, {_, _, Ts2}) -> Ts1 < Ts2 end,
!
1080
    Msgs = lists:sort(SortFun, emqx_inflight:values(Inflight)),
!
1081
    retry_send(Msgs, os:timestamp(), State ).
!
1082

1083
retry_send([], _Now, State) ->
1084
    {keep_state, ensure_retry_timer(State)};
!
1085
retry_send([{Type, Msg, Ts} | Msgs], Now, State = #state{retry_interval = Interval}) ->
1086
    Diff = timer:now_diff(Now, Ts) div 1000, %% micro -> ms
!
1087
    case (Diff >= Interval) of
!
1088
        true  -> case retry_send(Type, Msg, Now, State) of
!
1089
                     {ok, NewState} -> retry_send(Msgs, Now, NewState);
!
1090
                     {error, Error} -> {stop, Error}
!
1091
                 end;
1092
        false -> {keep_state, do_ensure_retry_timer(Interval - Diff, State)}
!
1093
    end.
1094

1095
retry_send(publish, Msg = #mqtt_msg{qos = QoS, packet_id = PacketId},
1096
           Now, State = #state{inflight = Inflight}) ->
1097
    Msg1 = Msg#mqtt_msg{dup = (QoS =:= ?QOS_1)},
!
1098
    case send(Msg1, State) of
!
1099
        {ok, NewState} ->
1100
            Inflight1 = emqx_inflight:update(PacketId, {publish, Msg1, Now}, Inflight),
!
1101
            {ok, NewState#state{inflight = Inflight1}};
!
1102
        Error = {error, _Reason} ->
1103
            Error
!
1104
    end;
1105
retry_send(pubrel, PacketId, Now, State = #state{inflight = Inflight}) ->
1106
    case send(?PUBREL_PACKET(PacketId), State) of
!
1107
        {ok, NewState} ->
1108
            Inflight1 = emqx_inflight:update(PacketId, {pubrel, PacketId, Now}, Inflight),
!
1109
            {ok, NewState#state{inflight = Inflight1}};
!
1110
        Error = {error, _Reason} ->
1111
            Error
!
1112
    end.
1113

1114
deliver(#mqtt_msg{qos = QoS, dup = Dup, retain = Retain, packet_id = PacketId,
1115
                  topic = Topic, props = Props, payload = Payload},
1116
        State) ->
1117
    Msg = #{qos => QoS, dup => Dup, retain => Retain, packet_id => PacketId,
217×
1118
            topic => Topic, properties => Props, payload => Payload,
1119
            client_pid => self()},
1120
    ok = eval_msg_handler(State, publish, Msg),
217×
1121
    State.
217×
1122

1123
eval_msg_handler(#state{msg_handler = ?NO_MSG_HDLR,
1124
                        owner = Owner},
1125
                 disconnected, {ReasonCode, Properties}) ->
1126
    %% Special handling for disconnected message when there is no handler callback
1127
    Owner ! {disconnected, ReasonCode, Properties},
8×
1128
    ok;
8×
1129
eval_msg_handler(#state{msg_handler = ?NO_MSG_HDLR},
1130
                 disconnected, _OtherReason) ->
1131
    %% do nothing to be backward compatible
1132
    ok;
25×
1133
eval_msg_handler(#state{msg_handler = ?NO_MSG_HDLR,
1134
                        owner = Owner}, Kind, Msg) ->
1135
    Owner ! {Kind, Msg},
29×
1136
    ok;
29×
1137
eval_msg_handler(#state{msg_handler = Handler}, Kind, Msg) ->
1138
    F = maps:get(Kind, Handler),
401×
1139
    _ = F(Msg),
401×
1140
    ok.
401×
1141

1142
packet_to_msg(#mqtt_packet{header   = #mqtt_packet_header{type   = ?PUBLISH,
1143
                                                          dup    = Dup,
1144
                                                          qos    = QoS,
1145
                                                          retain = R},
1146
                           variable = #mqtt_packet_publish{topic_name = Topic,
1147
                                                           packet_id  = PacketId,
1148
                                                           properties = Props},
1149
                           payload  = Payload}) ->
1150
    #mqtt_msg{qos = QoS, retain = R, dup = Dup, packet_id = PacketId,
217×
1151
               topic = Topic, props = Props, payload = Payload}.
1152

1153
msg_to_packet(#mqtt_msg{qos = QoS, dup = Dup, retain = Retain, packet_id = PacketId,
1154
                       topic = Topic, props = Props, payload = Payload}) ->
1155
    #mqtt_packet{header   = #mqtt_packet_header{type   = ?PUBLISH,
217×
1156
                                                qos    = QoS,
1157
                                                retain = Retain,
1158
                                                dup    = Dup},
1159
                 variable = #mqtt_packet_publish{topic_name = Topic,
1160
                                                 packet_id  = PacketId,
1161
                                                 properties = Props},
1162
                 payload  = Payload}.
1163

1164
%%------------------------------------------------------------------------------
1165
%% Socket Connect/Send
1166

1167
sock_connect(Hosts, SockOpts, Timeout) ->
1168
    sock_connect(Hosts, SockOpts, Timeout, {error, no_hosts}).
34×
1169

1170
sock_connect([], _SockOpts, _Timeout, LastErr) ->
1171
    LastErr;
!
1172
sock_connect([{Host, Port} | Hosts], SockOpts, Timeout, _LastErr) ->
1173
    case emqx_client_sock:connect(Host, Port, SockOpts, Timeout) of
34×
1174
        {ok, Socket} -> {ok, Socket};
34×
1175
        Err = {error, _Reason} ->
1176
            sock_connect(Hosts, SockOpts, Timeout, Err)
!
1177
    end.
1178

1179
hosts(#state{hosts = [], host = Host, port = Port}) ->
1180
    [{Host, Port}];
34×
1181
hosts(#state{hosts = Hosts}) -> Hosts.
!
1182

1183
send_puback(Packet, State) ->
1184
    case send(Packet, State) of
223×
1185
        {ok, NewState}  -> {keep_state, NewState};
222×
1186
        {error, Reason} -> {stop, {shutdown, Reason}}
1×
1187
    end.
1188

1189
send(Msg, State) when is_record(Msg, mqtt_msg) ->
1190
    send(msg_to_packet(Msg), State);
217×
1191

1192
send(Packet, State = #state{socket = Sock, proto_ver = Ver})
1193
    when is_record(Packet, mqtt_packet) ->
1194
    Data = emqx_frame:serialize(Packet, #{version => Ver}),
510×
1195
    ?LOG(debug, "[Client] SEND Data: ~1000p", [Packet]),
510×
1196
    case emqx_client_sock:send(Sock, Data) of
510×
1197
        ok  -> {ok, bump_last_packet_id(State)};
509×
1198
        Error -> Error
1×
1199
    end.
1200

1201
run_sock(State = #state{socket = Sock}) ->
1202
    emqx_client_sock:setopts(Sock, [{active, once}]), State.
418×
1203

1204
%%------------------------------------------------------------------------------
1205
%% Process incomming
1206

1207
process_incoming(<<>>, Packets, State) ->
1208
    {keep_state, State, next_events(Packets)};
384×
1209

1210
process_incoming(Bytes, Packets, State = #state{parse_state = ParseState}) ->
1211
    try emqx_frame:parse(Bytes, ParseState) of
498×
1212
        {ok, Packet, Rest} ->
1213
            process_incoming(Rest, [Packet|Packets], init_parse_state(State));
498×
1214
        {more, NewParseState} ->
1215
            {keep_state, State#state{parse_state = NewParseState}, next_events(Packets)};
!
1216
        {error, Reason} ->
1217
            {stop, Reason}
!
1218
    catch
1219
        error:Error ->
1220
            {stop, Error}
!
1221
    end.
1222

1223
next_events([]) ->
1224
    [];
!
1225
next_events([Packet]) ->
1226
    {next_event, cast, Packet};
317×
1227
next_events(Packets) ->
1228
    [{next_event, cast, Packet} || Packet <- lists:reverse(Packets)].
67×
1229

1230
%%------------------------------------------------------------------------------
1231
%% packet_id generation
1232

1233
bump_last_packet_id(State = #state{last_packet_id = Id}) ->
1234
    State#state{last_packet_id = next_packet_id(Id)}.
509×
1235

1236
-spec next_packet_id(packet_id()) -> packet_id().
1237
next_packet_id(?MAX_PACKET_ID) -> 1;
!
1238
next_packet_id(Id) -> Id + 1.
509×
Troubleshooting · Open an Issue · Sales · Support · ENTERPRISE · CAREERS · STATUS
BLOG · TWITTER · Legal & Privacy · Supported CI Services · What's a CI service? · Automated Testing

© 2019 Coveralls, LLC