• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

processone / ejabberd / 1296

19 Jan 2026 11:25AM UTC coverage: 33.562% (+0.09%) from 33.468%
1296

push

github

badlop
mod_conversejs: Cosmetic change: sort paths alphabetically

0 of 4 new or added lines in 1 file covered. (0.0%)

11245 existing lines in 174 files now uncovered.

15580 of 46421 relevant lines covered (33.56%)

1074.56 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

61.45
/src/mod_stream_mgmt.erl
1
%%%-------------------------------------------------------------------
2
%%% Author  : Holger Weiss <holger@zedat.fu-berlin.de>
3
%%% Created : 25 Dec 2016 by Evgeny Khramtsov <ekhramtsov@process-one.net>
4
%%%
5
%%%
6
%%% ejabberd, Copyright (C) 2002-2026   ProcessOne
7
%%%
8
%%% This program is free software; you can redistribute it and/or
9
%%% modify it under the terms of the GNU General Public License as
10
%%% published by the Free Software Foundation; either version 2 of the
11
%%% License, or (at your option) any later version.
12
%%%
13
%%% This program is distributed in the hope that it will be useful,
14
%%% but WITHOUT ANY WARRANTY; without even the implied warranty of
15
%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
16
%%% General Public License for more details.
17
%%%
18
%%% You should have received a copy of the GNU General Public License along
19
%%% with this program; if not, write to the Free Software Foundation, Inc.,
20
%%% 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
21
%%%
22
%%%-------------------------------------------------------------------
23
-module(mod_stream_mgmt).
24
-behaviour(gen_mod).
25
-author('holger@zedat.fu-berlin.de').
26
-protocol({xep, 198, '1.5.2', '14.05', "complete", ""}).
27

28
%% gen_mod API
29
-export([start/2, stop/1, reload/3, depends/2, mod_opt_type/1, mod_options/1]).
30
-export([mod_doc/0]).
31
%% hooks
32
-export([c2s_stream_started/2, c2s_stream_features/2,
33
         c2s_authenticated_packet/2, c2s_unauthenticated_packet/2,
34
         c2s_unbinded_packet/2, c2s_closed/2, c2s_terminated/2,
35
         c2s_handle_send/3, c2s_handle_info/2, c2s_handle_cast/2,
36
         c2s_handle_call/3, c2s_handle_recv/3, c2s_inline_features/3,
37
         c2s_handle_sasl2_inline/1, c2s_handle_sasl2_inline_post/3,
38
         c2s_handle_bind2_inline/1]).
39
%% adjust pending session timeout / access queue
40
-export([get_resume_timeout/1, set_resume_timeout/2, queue_find/2]).
41

42
%% for sasl2 inline resume
43
-export([has_resume_data/2, post_resume_tasks/1]).
44

45
-include_lib("xmpp/include/xmpp.hrl").
46
-include("logger.hrl").
47
-include_lib("p1_utils/include/p1_queue.hrl").
48
-include("translate.hrl").
49

50
-define(STREAM_MGMT_CACHE, stream_mgmt_cache).
51

52
-define(is_sm_packet(Pkt),
53
        is_record(Pkt, sm_enable) or
54
        is_record(Pkt, sm_resume) or
55
        is_record(Pkt, sm_a) or
56
        is_record(Pkt, sm_r)).
57

58
-type state() :: ejabberd_c2s:state().
59
-type queue() :: p1_queue:queue({non_neg_integer(), erlang:timestamp(), xmpp_element() | xmlel()}).
60
-type id() :: binary().
61
-type error_reason() :: session_not_found | session_timed_out |
62
                        session_is_dead | session_has_exited |
63
                        session_was_killed | session_copy_timed_out |
64
                        invalid_previd.
65

66
%%%===================================================================
67
%%% API
68
%%%===================================================================
69
start(_Host, Opts) ->
70
    init_cache(Opts),
98✔
71
    {ok, [{hook, c2s_stream_started, c2s_stream_started, 50},
98✔
72
          {hook, c2s_post_auth_features, c2s_stream_features, 50},
73
          {hook, c2s_inline_features, c2s_inline_features, 50},
74
          {hook, c2s_unauthenticated_packet, c2s_unauthenticated_packet, 50},
75
          {hook, c2s_unbinded_packet, c2s_unbinded_packet, 50},
76
          {hook, c2s_authenticated_packet, c2s_authenticated_packet, 50},
77
          {hook, c2s_handle_send, c2s_handle_send, 50},
78
          {hook, c2s_handle_recv, c2s_handle_recv, 50},
79
          {hook, c2s_handle_info, c2s_handle_info, 50},
80
          {hook, c2s_handle_cast, c2s_handle_cast, 50},
81
          {hook, c2s_handle_call, c2s_handle_call, 50},
82
          {hook, c2s_handle_sasl2_inline, c2s_handle_sasl2_inline, 50},
83
          {hook, c2s_handle_sasl2_inline_post, c2s_handle_sasl2_inline_post, 50},
84
          {hook, c2s_handle_bind2_inline, c2s_handle_bind2_inline, 50},
85
          {hook, c2s_closed, c2s_closed, 50},
86
          {hook, c2s_terminated, c2s_terminated, 50}]}.
87

88
stop(_Host) ->
89
    ok.
98✔
90

91
reload(_Host, NewOpts, _OldOpts) ->
92
    init_cache(NewOpts),
×
93
    ?WARNING_MSG("Module ~ts is reloaded, but new configuration will take "
×
94
                 "effect for newly created client connections only", [?MODULE]).
×
95

96
depends(_Host, _Opts) ->
97
    [].
116✔
98

99
c2s_stream_started(#{lserver := LServer} = State, _StreamStart) ->
100
    State1 = maps:remove(mgmt_options, State),
4,111✔
101
    ResumeTimeout = get_configured_resume_timeout(LServer),
4,111✔
102
    MaxResumeTimeout = get_max_resume_timeout(LServer, ResumeTimeout),
4,111✔
103
    State1#{mgmt_state => inactive,
4,111✔
104
            mgmt_queue_type => get_queue_type(LServer),
105
            mgmt_max_queue => get_max_ack_queue(LServer),
106
            mgmt_timeout => ResumeTimeout,
107
            mgmt_max_timeout => MaxResumeTimeout,
108
            mgmt_ack_timeout => get_ack_timeout(LServer),
109
            mgmt_resend => get_resend_on_timeout(LServer),
110
            mgmt_stanzas_in => 0,
111
            mgmt_stanzas_out => 0,
112
            mgmt_stanzas_req => 0};
113
c2s_stream_started(State, _StreamStart) ->
114
    State.
×
115

116
c2s_stream_features(Acc, Host) ->
117
    case gen_mod:is_loaded(Host, ?MODULE) of
2,018✔
118
        true ->
119
            [#feature_sm{xmlns = ?NS_STREAM_MGMT_2},
2,018✔
120
             #feature_sm{xmlns = ?NS_STREAM_MGMT_3}|Acc];
121
        false ->
122
            Acc
×
123
    end.
124

125
c2s_inline_features({Sasl, Bind, Extra} = Acc, Host, _State) ->
126
    case gen_mod:is_loaded(Host, ?MODULE) of
×
127
        true ->
128
            {[#feature_sm{xmlns = ?NS_STREAM_MGMT_3} | Sasl],
×
129
             [#bind2_feature{var = ?NS_STREAM_MGMT_3} | Bind],
130
             Extra};
131
        false ->
132
            Acc
×
133
    end.
134

135
c2s_handle_sasl2_inline({State, Els, Results} = Acc) ->
136
    case lists:keytake(sm_resume, 1, Els) of
×
137
        {value, Resume, Rest} ->
138
            case has_resume_data(State, Resume) of
×
139
                {ok, NewState, Resumed} ->
140
                    Rest2 = lists:keydelete(bind2_bind, 1, Rest),
×
141
                    {NewState, Rest2, [Resumed | Results]};
×
142
                {error, ResumeError, _Reason} ->
143
                    {State, Els, [ResumeError | Results]}
×
144
            end;
145
        _ ->
146
            Acc
×
147
    end.
148

149
c2s_handle_sasl2_inline_post(State, _Els, Results) ->
150
    case lists:keyfind(sm_resumed, 1, Results) of
×
151
        false ->
152
            State;
×
153
        _ ->
154
            post_resume_tasks(State)
×
155
    end.
156

157
c2s_handle_bind2_inline({State, Els, Results}) ->
158
    case lists:keyfind(sm_enable, 1, Els) of
×
159
        #sm_enable{xmlns = XMLNS} = Pkt ->
160
            {State2, Res} = handle_enable_int(State#{mgmt_xmlns => XMLNS}, Pkt),
×
161
            {State2, Els, [Res | Results]};
×
162
        _ ->
163
            {State, Els, Results}
×
164
    end.
165

166
c2s_unauthenticated_packet(#{lang := Lang} = State, Pkt) when ?is_sm_packet(Pkt) ->
167
    %% XEP-0198 says: "For client-to-server connections, the client MUST NOT
168
    %% attempt to enable stream management until after it has completed Resource
169
    %% Binding unless it is resuming a previous session".  However, it also
170
    %% says: "Stream management errors SHOULD be considered recoverable", so we
171
    %% won't bail out.
172
    Err = #sm_failed{reason = 'not-authorized',
×
173
                     text = xmpp:mk_text(?T("Unauthorized"), Lang),
174
                     xmlns = ?NS_STREAM_MGMT_3},
175
    {stop, send(State, Err)};
×
176
c2s_unauthenticated_packet(State, _Pkt) ->
UNCOV
177
    State.
3✔
178

179
c2s_unbinded_packet(State, #sm_resume{} = Pkt) ->
UNCOV
180
    case handle_resume(State, Pkt) of
2✔
181
        {ok, ResumedState} ->
UNCOV
182
            {stop, ResumedState};
1✔
183
        {error, State1} ->
UNCOV
184
            {stop, State1}
1✔
185
    end;
186
c2s_unbinded_packet(State, Pkt) when ?is_sm_packet(Pkt) ->
187
    c2s_unauthenticated_packet(State, Pkt);
×
188
c2s_unbinded_packet(State, _Pkt) ->
189
    State.
×
190

191
c2s_authenticated_packet(#{mgmt_state := MgmtState} = State, Pkt)
192
  when ?is_sm_packet(Pkt) ->
UNCOV
193
    if MgmtState == pending; MgmtState == active ->
15✔
UNCOV
194
            {stop, perform_stream_mgmt(Pkt, State)};
4✔
195
       true ->
UNCOV
196
            {stop, negotiate_stream_mgmt(Pkt, State)}
11✔
197
    end;
198
c2s_authenticated_packet(State, Pkt) ->
199
    update_num_stanzas_in(State, Pkt).
42,484✔
200

201
c2s_handle_recv(#{mgmt_state := MgmtState,
202
                  lang := Lang} = State, El, {error, Why}) ->
UNCOV
203
    Xmlns = xmpp:get_ns(El),
2✔
UNCOV
204
    IsStanza = xmpp:is_stanza(El),
2✔
UNCOV
205
    if Xmlns == ?NS_STREAM_MGMT_2; Xmlns == ?NS_STREAM_MGMT_3 ->
2✔
206
            Txt = xmpp:io_format_error(Why),
×
207
            Err = #sm_failed{reason = 'bad-request',
×
208
                             text = xmpp:mk_text(Txt, Lang),
209
                             xmlns = Xmlns},
210
            send(State, Err);
×
211
       IsStanza andalso (MgmtState == pending orelse MgmtState == active) ->
212
            State1 = update_num_stanzas_in(State, El),
×
213
            case xmpp:get_type(El) of
×
214
                <<"result">> -> State1;
×
215
                <<"error">> -> State1;
×
216
                _ ->
217
                    State1#{mgmt_force_enqueue => true}
×
218
            end;
219
       true ->
UNCOV
220
            State
2✔
221
    end;
222
c2s_handle_recv(State, _, _) ->
223
    State.
46,643✔
224

225
c2s_handle_send(#{mgmt_state := MgmtState, mod := Mod,
226
                  lang := Lang} = State, Pkt, SendResult)
227
  when MgmtState == pending; MgmtState == active; MgmtState == resumed ->
UNCOV
228
    IsStanza = xmpp:is_stanza(Pkt),
52✔
UNCOV
229
    case Pkt of
52✔
230
        _ when IsStanza ->
UNCOV
231
            case need_to_enqueue(State, Pkt) of
32✔
232
                {true, State1} ->
UNCOV
233
                    case mgmt_queue_add(State1, Pkt) of
31✔
234
                        #{mgmt_max_queue := exceeded} = State2 ->
UNCOV
235
                            State3 = State2#{mgmt_resend => false},
2✔
UNCOV
236
                            Err = xmpp:serr_policy_violation(
2✔
237
                                    ?T("Too many unacked stanzas"), Lang),
UNCOV
238
                            send(State3, Err);
2✔
239
                        State2 when SendResult == ok ->
UNCOV
240
                            send_rack(State2);
10✔
241
                        State2 ->
UNCOV
242
                            State2
19✔
243
                    end;
244
                {false, State1} ->
UNCOV
245
                    State1
1✔
246
            end;
247
        #stream_error{} ->
UNCOV
248
            case MgmtState of
3✔
249
                resumed ->
UNCOV
250
                    State;
1✔
251
                active ->
UNCOV
252
                    State;
1✔
253
                pending ->
UNCOV
254
                    Mod:stop_async(self()),
1✔
UNCOV
255
                    {stop, State#{stop_reason => {stream, {out, Pkt}}}}
1✔
256
            end;
257
        _ ->
UNCOV
258
            State
17✔
259
    end;
260
c2s_handle_send(State, _Pkt, _Result) ->
261
    State.
56,734✔
262

263
c2s_handle_cast(#{mgmt_state := active} = State, send_ping) ->
264
    {stop, send_rack(State)};
×
265
c2s_handle_cast(#{mgmt_state := pending} = State, send_ping) ->
266
    {stop, State};
×
267
c2s_handle_cast(State, _Msg) ->
268
    State.
×
269

270
c2s_handle_call(#{mgmt_id := MgmtID, mgmt_queue := Queue, mod := Mod} = State,
271
                {resume_session, MgmtID}, From) ->
UNCOV
272
    State1 = State#{mgmt_queue => p1_queue:file_to_ram(Queue)},
1✔
UNCOV
273
    Mod:reply(From, {resume, State1}),
1✔
UNCOV
274
    {stop, State#{mgmt_state => resumed, mgmt_queue => p1_queue:clear(Queue)}};
1✔
275
c2s_handle_call(#{mod := Mod} = State, {resume_session, _}, From) ->
276
    Mod:reply(From, {error, session_not_found}),
×
277
    {stop, State};
×
278
c2s_handle_call(State, _Call, _From) ->
279
    State.
×
280

281
c2s_handle_info(#{mgmt_ack_timer := TRef, jid := JID, mod := Mod} = State,
282
                {timeout, TRef, ack_timeout}) ->
283
    ?DEBUG("Timed out waiting for stream management acknowledgement of ~ts",
×
284
           [jid:encode(JID)]),
×
285
    State1 = Mod:close(State),
×
286
    State2 = State1#{stop_reason => {socket, ack_timeout}},
×
287
    {stop, transition_to_pending(State2, ack_timeout)};
×
288
c2s_handle_info(#{mgmt_state := pending, lang := Lang,
289
                  mgmt_pending_timer := TRef, jid := JID, mod := Mod} = State,
290
                {timeout, TRef, pending_timeout}) ->
UNCOV
291
    ?DEBUG("Timed out waiting for resumption of stream for ~ts",
9✔
UNCOV
292
           [jid:encode(JID)]),
9✔
UNCOV
293
    Txt = ?T("Timed out waiting for stream resumption"),
9✔
UNCOV
294
    Err = xmpp:serr_connection_timeout(Txt, Lang),
9✔
UNCOV
295
    Mod:stop_async(self()),
9✔
UNCOV
296
    {stop, State#{mgmt_state => timeout,
9✔
297
                  stop_reason => {stream, {out, Err}}}};
298
c2s_handle_info(State, {_Ref, {resume, #{jid := JID} = OldState}}) ->
299
    %% This happens if the resume_session/1 request timed out; the new session
300
    %% now receives the late response.
301
    ?DEBUG("Received old session state for ~ts after failed resumption",
×
302
           [jid:encode(JID)]),
×
303
    route_unacked_stanzas(OldState#{mgmt_resend => false}),
×
304
    {stop, State};
×
305
c2s_handle_info(State, {timeout, _, Timeout}) when Timeout == ack_timeout;
306
                                                   Timeout == pending_timeout ->
307
    %% Late arrival of an already cancelled timer: we just ignore it.
308
    %% This might happen because misc:cancel_timer/1 doesn't guarantee
309
    %% timer cancellation in the case when p1_server is used.
310
    {stop, State};
×
311
c2s_handle_info(State, _) ->
312
    State.
49,930✔
313

314
c2s_closed(State, {stream, _}) ->
315
    State;
2,008✔
316
c2s_closed(#{mgmt_state := active} = State, Reason) ->
UNCOV
317
    {stop, transition_to_pending(State, Reason)};
11✔
318
c2s_closed(State, _Reason) ->
UNCOV
319
    State.
73✔
320

321
c2s_terminated(#{mgmt_state := resumed, sid := SID, jid := JID} = State, _Reason) ->
UNCOV
322
    ?DEBUG("Closing former stream of resumed session for ~ts",
1✔
UNCOV
323
           [jid:encode(JID)]),
1✔
UNCOV
324
    {U, S, R} = jid:tolower(JID),
1✔
UNCOV
325
    ejabberd_sm:close_session(SID, U, S, R),
1✔
UNCOV
326
    route_late_queue_after_resume(State),
1✔
UNCOV
327
    ejabberd_c2s:bounce_message_queue(SID, JID),
1✔
UNCOV
328
    {stop, State};
1✔
329
c2s_terminated(#{mgmt_state := MgmtState, mgmt_stanzas_in := In,
330
                 mgmt_id := MgmtID, jid := JID} = State, _Reason) ->
UNCOV
331
    case MgmtState of
11✔
332
        timeout ->
UNCOV
333
            store_stanzas_in(jid:tolower(JID), MgmtID, In);
9✔
334
        _ ->
UNCOV
335
            ok
2✔
336
    end,
UNCOV
337
    route_unacked_stanzas(State),
11✔
UNCOV
338
    State;
11✔
339
c2s_terminated(State, _Reason) ->
340
    State.
2,080✔
341

342
%%%===================================================================
343
%%% Adjust pending session timeout / access queue
344
%%%===================================================================
345
-spec get_resume_timeout(state()) -> non_neg_integer().
346
get_resume_timeout(#{mgmt_timeout := Timeout}) ->
UNCOV
347
    Timeout.
8✔
348

349
-spec set_resume_timeout(state(), non_neg_integer()) -> state().
350
set_resume_timeout(#{mgmt_timeout := Timeout} = State, Timeout) ->
351
    State;
×
352
set_resume_timeout(State, Timeout) ->
UNCOV
353
    State1 = restart_pending_timer(State, Timeout),
16✔
UNCOV
354
    State1#{mgmt_timeout => Timeout}.
16✔
355

356
-spec queue_find(fun((stanza()) -> boolean()), queue())
357
      -> stanza() | none.
358
queue_find(Pred, Queue) ->
UNCOV
359
    case p1_queue:out(Queue) of
8✔
360
        {{value, {_, _, Pkt}}, Queue1} ->
361
            case Pred(Pkt) of
×
362
                true ->
363
                    Pkt;
×
364
                false ->
365
                    queue_find(Pred, Queue1)
×
366
            end;
367
        {empty, _Queue1} ->
UNCOV
368
            none
8✔
369
    end.
370

371
%%%===================================================================
372
%%% Internal functions
373
%%%===================================================================
374
-spec negotiate_stream_mgmt(xmpp_element(), state()) -> state().
375
negotiate_stream_mgmt(Pkt, #{lang := Lang} = State) ->
UNCOV
376
    Xmlns = xmpp:get_ns(Pkt),
11✔
UNCOV
377
    case Pkt of
11✔
378
        #sm_enable{} ->
UNCOV
379
            handle_enable(State#{mgmt_xmlns => Xmlns}, Pkt);
11✔
380
        _ when is_record(Pkt, sm_a);
381
               is_record(Pkt, sm_r);
382
               is_record(Pkt, sm_resume) ->
383
            Txt = ?T("Stream management is not enabled"),
×
384
            Err = #sm_failed{reason = 'unexpected-request',
×
385
                             text = xmpp:mk_text(Txt, Lang),
386
                             xmlns = Xmlns},
387
            send(State, Err)
×
388
    end.
389

390
-spec perform_stream_mgmt(xmpp_element(), state()) -> state().
391
perform_stream_mgmt(Pkt, #{mgmt_xmlns := Xmlns, lang := Lang} = State) ->
UNCOV
392
    case xmpp:get_ns(Pkt) of
4✔
393
        Xmlns ->
UNCOV
394
            case Pkt of
4✔
395
                #sm_r{} ->
UNCOV
396
                    handle_r(State);
3✔
397
                #sm_a{} ->
UNCOV
398
                    handle_a(State, Pkt);
1✔
399
                _ when is_record(Pkt, sm_enable);
400
                       is_record(Pkt, sm_resume) ->
401
                    Txt = ?T("Stream management is already enabled"),
×
402
                    send(State, #sm_failed{reason = 'unexpected-request',
×
403
                                           text = xmpp:mk_text(Txt, Lang),
404
                                           xmlns = Xmlns})
405
            end;
406
        _ ->
407
            Txt = ?T("Unsupported version"),
×
408
            send(State, #sm_failed{reason = 'unexpected-request',
×
409
                                   text = xmpp:mk_text(Txt, Lang),
410
                                   xmlns = Xmlns})
411
    end.
412

413
-spec handle_enable_int(state(), sm_enable()) -> {state(), sm_enabled()}.
414
handle_enable_int(#{mgmt_timeout := DefaultTimeout,
415
                    mgmt_queue_type := QueueType,
416
                    mgmt_max_timeout := MaxTimeout,
417
                    mgmt_xmlns := Xmlns, jid := JID} = State,
418
                  #sm_enable{resume = Resume, max = Max}) ->
UNCOV
419
    State1 = State#{mgmt_id => make_id()},
11✔
UNCOV
420
    Timeout = if Resume == false ->
11✔
UNCOV
421
        0;
1✔
422
                  Max /= undefined, Max > 0, Max*1000 =< MaxTimeout ->
423
                      Max*1000;
×
424
                  true ->
UNCOV
425
                      DefaultTimeout
10✔
426
              end,
UNCOV
427
    Res = if Timeout > 0 ->
11✔
UNCOV
428
        ?DEBUG("Stream management with resumption enabled for ~ts",
10✔
UNCOV
429
               [jid:encode(JID)]),
10✔
UNCOV
430
        #sm_enabled{xmlns = Xmlns,
10✔
431
                    id = encode_id(State1),
432
                    resume = true,
433
                    max = Timeout div 1000};
434
              true ->
UNCOV
435
                  ?DEBUG("Stream management without resumption enabled for ~ts",
1✔
UNCOV
436
                         [jid:encode(JID)]),
1✔
UNCOV
437
                  #sm_enabled{xmlns = Xmlns}
1✔
438
          end,
UNCOV
439
    State2 = State1#{mgmt_state => active,
11✔
440
                     mgmt_queue => p1_queue:new(QueueType),
441
                     mgmt_timeout => Timeout},
UNCOV
442
    {State2, Res}.
11✔
443

444
-spec handle_enable(state(), sm_enable()) -> state().
445
handle_enable(State, Enable) ->
UNCOV
446
    {State2, Res} = handle_enable_int(State, Enable),
11✔
UNCOV
447
    send(State2, Res).
11✔
448

449
-spec handle_r(state()) -> state().
450
handle_r(#{mgmt_xmlns := Xmlns, mgmt_stanzas_in := H} = State) ->
UNCOV
451
    Res = #sm_a{xmlns = Xmlns, h = H},
3✔
UNCOV
452
    send(State, Res).
3✔
453

454
-spec handle_a(state(), sm_a()) -> state().
455
handle_a(State, #sm_a{h = H}) ->
UNCOV
456
    State1 = check_h_attribute(State, H),
1✔
UNCOV
457
    resend_rack(State1).
1✔
458

459
-spec handle_resume(state(), sm_resume()) -> {ok, state()} | {error, state()}.
460
handle_resume(#{user := User, lserver := LServer} = State,
461
              #sm_resume{} = Resume) ->
UNCOV
462
    case has_resume_data(State, Resume) of
2✔
463
        {ok, ResumedState, ResumedEl} ->
UNCOV
464
            State2 = send(ResumedState, ResumedEl),
1✔
UNCOV
465
            {ok, post_resume_tasks(State2)};
1✔
466
        {error, El, Reason} ->
UNCOV
467
            log_resumption_error(User, LServer, Reason),
1✔
UNCOV
468
            {error, send(State, El)}
1✔
469
    end.
470

471
-spec has_resume_data(state(), sm_resume()) ->
472
    {ok, state(), sm_resumed()} | {error, sm_failed(), error_reason()}.
473
has_resume_data(#{lang := Lang} = State,
474
                #sm_resume{h = H, previd = PrevID, xmlns = Xmlns}) ->
UNCOV
475
    case inherit_session_state(State, PrevID) of
2✔
476
        {ok, InheritedState} ->
UNCOV
477
            State1 = check_h_attribute(InheritedState, H),
1✔
UNCOV
478
            #{mgmt_xmlns := AttrXmlns, mgmt_stanzas_in := AttrH} = State1,
1✔
UNCOV
479
            {ok, State1, #sm_resumed{xmlns = AttrXmlns,
1✔
480
                                     h = AttrH,
481
                                     previd = PrevID}};
482
        {error, Err, InH} ->
UNCOV
483
            {error, #sm_failed{reason = 'item-not-found',
1✔
484
                               text = xmpp:mk_text(format_error(Err), Lang),
485
                               h = InH, xmlns = Xmlns}, Err};
486
        {error, Err} ->
487
            {error, #sm_failed{reason = 'item-not-found',
×
488
                               text = xmpp:mk_text(format_error(Err), Lang),
489
                               xmlns = Xmlns}, Err}
490
    end.
491

492
-spec post_resume_tasks(state()) -> state().
493
post_resume_tasks(#{lserver := LServer, socket := Socket, jid := JID,
494
                    mgmt_xmlns := AttrXmlns} = State) ->
UNCOV
495
    State3 = resend_unacked_stanzas(State),
1✔
UNCOV
496
    State4 = send(State3, #sm_r{xmlns = AttrXmlns}),
1✔
UNCOV
497
    State5 = ejabberd_hooks:run_fold(c2s_session_resumed, LServer, State4, []),
1✔
UNCOV
498
    ?INFO_MSG("(~ts) Resumed session for ~ts",
1✔
UNCOV
499
              [xmpp_socket:pp(Socket), jid:encode(JID)]),
1✔
UNCOV
500
    State5.
1✔
501

502
-spec transition_to_pending(state(), _) -> state().
503
transition_to_pending(#{mgmt_state := active, mod := Mod,
504
                        mgmt_timeout := 0} = State, _Reason) ->
505
    Mod:stop_async(self()),
×
506
    State;
×
507
transition_to_pending(#{mgmt_state := active, jid := JID, socket := Socket,
508
                        lserver := LServer, mgmt_timeout := Timeout} = State,
509
                      Reason) ->
UNCOV
510
    State1 = cancel_ack_timer(State),
11✔
UNCOV
511
    ?INFO_MSG("(~ts) Closing c2s connection for ~ts: ~ts; "
11✔
512
              "waiting ~B seconds for stream resumption",
513
              [xmpp_socket:pp(Socket), jid:encode(JID),
UNCOV
514
               format_reason(State, Reason), Timeout div 1000]),
11✔
UNCOV
515
    TRef = erlang:start_timer(Timeout, self(), pending_timeout),
11✔
UNCOV
516
    State2 = State1#{mgmt_state => pending, mgmt_pending_timer => TRef},
11✔
UNCOV
517
    ejabberd_hooks:run_fold(c2s_session_pending, LServer, State2, []);
11✔
518
transition_to_pending(State, _Reason) ->
519
    State.
×
520

521
-spec check_h_attribute(state(), non_neg_integer()) -> state().
522
check_h_attribute(#{mgmt_stanzas_out := NumStanzasOut, jid := JID,
523
                    lang := Lang} = State, H)
524
  when H > NumStanzasOut ->
525
    ?WARNING_MSG("~ts acknowledged ~B stanzas, but only ~B were sent",
×
526
                 [jid:encode(JID), H, NumStanzasOut]),
×
527
    State1 = State#{mgmt_resend => false},
×
528
    Err = xmpp:serr_undefined_condition(
×
529
            ?T("Client acknowledged more stanzas than sent by server"), Lang),
530
    send(State1, Err);
×
531
check_h_attribute(#{mgmt_stanzas_out := NumStanzasOut, jid := JID} = State, H) ->
UNCOV
532
    ?DEBUG("~ts acknowledged ~B of ~B stanzas",
2✔
UNCOV
533
           [jid:encode(JID), H, NumStanzasOut]),
2✔
UNCOV
534
    mgmt_queue_drop(State, H).
2✔
535

536
-spec update_num_stanzas_in(state(), xmpp_element() | xmlel()) -> state().
537
update_num_stanzas_in(#{mgmt_state := MgmtState,
538
                        mgmt_stanzas_in := NumStanzasIn} = State, El)
539
  when MgmtState == active; MgmtState == pending ->
UNCOV
540
    NewNum = case {xmpp:is_stanza(El), NumStanzasIn} of
4✔
541
                 {true, 4294967295} ->
542
                     0;
×
543
                 {true, Num} ->
UNCOV
544
                     Num + 1;
4✔
545
                 {false, Num} ->
546
                     Num
×
547
             end,
UNCOV
548
    State#{mgmt_stanzas_in => NewNum};
4✔
549
update_num_stanzas_in(State, _El) ->
550
    State.
42,480✔
551

552
-spec send_rack(state()) -> state().
553
send_rack(#{mgmt_ack_timer := _} = State) ->
UNCOV
554
    State;
9✔
555
send_rack(#{mgmt_xmlns := Xmlns,
556
            mgmt_stanzas_out := NumStanzasOut} = State) ->
UNCOV
557
    State1 = State#{mgmt_stanzas_req => NumStanzasOut},
1✔
UNCOV
558
    State2 = start_ack_timer(State1),
1✔
UNCOV
559
    send(State2, #sm_r{xmlns = Xmlns}).
1✔
560

561
-spec resend_rack(state()) -> state().
562
resend_rack(#{mgmt_ack_timer := _,
563
              mgmt_queue := Queue,
564
              mgmt_stanzas_out := NumStanzasOut,
565
              mgmt_stanzas_req := NumStanzasReq} = State) ->
566
    State1 = cancel_ack_timer(State),
×
567
    case NumStanzasReq < NumStanzasOut andalso not p1_queue:is_empty(Queue) of
×
568
        true -> send_rack(State1);
×
569
        false -> State1
×
570
    end;
571
resend_rack(State) ->
UNCOV
572
    State.
1✔
573

574
-spec mgmt_queue_add(state(), xmlel() | xmpp_element()) -> state().
575
mgmt_queue_add(#{mgmt_stanzas_out := NumStanzasOut,
576
                 mgmt_queue := Queue} = State, Pkt) ->
UNCOV
577
    NewNum = case NumStanzasOut of
31✔
578
                 4294967295 -> 0;
×
UNCOV
579
                 Num -> Num + 1
31✔
580
             end,
UNCOV
581
    Queue1 = p1_queue:in({NewNum, erlang:timestamp(), Pkt}, Queue),
31✔
UNCOV
582
    State1 = State#{mgmt_queue => Queue1, mgmt_stanzas_out => NewNum},
31✔
UNCOV
583
    check_queue_length(State1).
31✔
584

585
-spec mgmt_queue_drop(state(), non_neg_integer()) -> state().
586
mgmt_queue_drop(#{mgmt_queue := Queue} = State, NumHandled) ->
UNCOV
587
    NewQueue = p1_queue:dropwhile(
2✔
UNCOV
588
                 fun({N, _T, _E}) -> N =< NumHandled end, Queue),
2✔
UNCOV
589
    State#{mgmt_queue => NewQueue}.
2✔
590

591
-spec check_queue_length(state()) -> state().
592
check_queue_length(#{mgmt_max_queue := Limit} = State)
593
  when Limit == infinity; Limit == exceeded ->
594
    State;
×
595
check_queue_length(#{mgmt_queue := Queue, mgmt_max_queue := Limit} = State) ->
UNCOV
596
    case p1_queue:len(Queue) > Limit of
31✔
597
        true ->
UNCOV
598
            State#{mgmt_max_queue => exceeded};
2✔
599
        false ->
UNCOV
600
            State
29✔
601
    end.
602

603
-spec route_late_queue_after_resume(state()) -> ok.
604
route_late_queue_after_resume(#{mgmt_queue := Queue, jid := JID})
605
    when ?qlen(Queue) > 0 ->
606
    ?DEBUG("Re-routing ~B late queued packets to ~ts",
×
607
           [p1_queue:len(Queue), jid:encode(JID)]),
×
608
    p1_queue:foreach(
×
609
        fun({_, _Time, Pkt}) ->
610
            ejabberd_router:route(Pkt)
×
611
        end, Queue);
612
route_late_queue_after_resume(_State) ->
UNCOV
613
    ok.
1✔
614

615
-spec resend_unacked_stanzas(state()) -> state().
616
resend_unacked_stanzas(#{mgmt_state := MgmtState,
617
                         mgmt_queue := Queue,
618
                         jid := JID} = State)
619
  when (MgmtState == active orelse
620
        MgmtState == pending orelse
621
        MgmtState == timeout) andalso ?qlen(Queue) > 0 ->
UNCOV
622
    ?DEBUG("Resending ~B unacknowledged stanza(s) to ~ts",
1✔
UNCOV
623
           [p1_queue:len(Queue), jid:encode(JID)]),
1✔
UNCOV
624
    p1_queue:foldl(
1✔
625
      fun({_, Time, Pkt}, AccState) ->
UNCOV
626
              Pkt1 = add_resent_delay_info(AccState, Pkt, Time),
1✔
UNCOV
627
              Pkt2 = if ?is_stanza(Pkt1) ->
1✔
UNCOV
628
                             xmpp:put_meta(Pkt1, mgmt_is_resent, true);
1✔
629
                        true ->
630
                             Pkt1
×
631
                     end,
UNCOV
632
              send(AccState, Pkt2)
1✔
633
      end, State, Queue);
634
resend_unacked_stanzas(State) ->
635
    State.
×
636

637
-spec route_unacked_stanzas(state()) -> ok.
638
route_unacked_stanzas(#{mgmt_state := MgmtState,
639
                        mgmt_resend := MgmtResend,
640
                        lang := Lang, user := User,
641
                        jid := JID, lserver := LServer,
642
                        mgmt_queue := Queue,
643
                        resource := Resource} = State)
644
  when (MgmtState == active orelse
645
        MgmtState == pending orelse
646
        MgmtState == timeout) andalso ?qlen(Queue) > 0 ->
UNCOV
647
    ResendOnTimeout = case MgmtResend of
10✔
648
                          Resend when is_boolean(Resend) ->
UNCOV
649
                              Resend;
10✔
650
                          if_offline ->
651
                              case ejabberd_sm:get_user_resources(User, LServer) of
×
652
                                  [Resource] ->
653
                                      %% Same resource opened new session
654
                                      true;
×
655
                                  [] -> true;
×
656
                                  _ -> false
×
657
                              end
658
                      end,
UNCOV
659
    ?DEBUG("Re-routing ~B unacknowledged stanza(s) to ~ts",
10✔
UNCOV
660
           [p1_queue:len(Queue), jid:encode(JID)]),
10✔
UNCOV
661
    ModOfflineEnabled = gen_mod:is_loaded(LServer, mod_offline),
10✔
UNCOV
662
    p1_queue:foreach(
10✔
663
      fun({_, _Time, #presence{from = From}}) ->
664
              ?DEBUG("Dropping presence stanza from ~ts", [jid:encode(From)]);
×
665
         ({_, _Time, #iq{} = El}) ->
666
              Txt = ?T("User session terminated"),
×
667
              ejabberd_router:route_error(
×
668
                El, xmpp:err_service_unavailable(Txt, Lang));
669
         ({_, _Time, #message{from = From, meta = #{carbon_copy := true}}}) ->
670
              %% XEP-0280 says: "When a receiving server attempts to deliver a
671
              %% forked message, and that message bounces with an error for
672
              %% any reason, the receiving server MUST NOT forward that error
673
              %% back to the original sender."  Resending such a stanza could
674
              %% easily lead to unexpected results as well.
675
              ?DEBUG("Dropping forwarded message stanza from ~ts",
×
676
                     [jid:encode(From)]);
×
677
         ({_, Time, #message{} = Msg}) ->
UNCOV
678
             case {ModOfflineEnabled, ResendOnTimeout,
30✔
679
                   xmpp:get_meta(Msg, mam_archived, false)} of
680
                 Val when Val == {true, true, false};
681
                          Val == {true, true, true};
682
                          Val == {false, true, false} ->
683
                     NewEl = add_resent_delay_info(State, Msg, Time),
×
684
                     ejabberd_router:route(NewEl);
×
685
                 {_, _, true} ->
686
                     ?DEBUG("Dropping archived message stanza from ~s",
×
687
                            [jid:encode(xmpp:get_from(Msg))]);
×
688
                 _ ->
UNCOV
689
                     Txt = ?T("User session terminated"),
30✔
UNCOV
690
                     ejabberd_router:route_error(
30✔
691
                         Msg, xmpp:err_service_unavailable(Txt, Lang))
692
             end;
693
         ({_, _Time, El}) ->
694
              %% Raw element of type 'error' resulting from a validation error
695
              %% We cannot pass it to the router, it will generate an error
696
              ?DEBUG("Do not route raw element from ack queue: ~p", [El])
×
697
      end, Queue);
698
route_unacked_stanzas(_State) ->
UNCOV
699
    ok.
1✔
700

701
-spec inherit_session_state(state(), binary()) -> {ok, state()} |
702
                                                  {error, error_reason()} |
703
                                                  {error, error_reason(), non_neg_integer()}.
704
inherit_session_state(#{user := U, server := S,
705
                        mgmt_queue_type := QueueType} = State, PrevID) ->
UNCOV
706
    case decode_id(PrevID) of
2✔
707
        {ok, {R, MgmtID}} ->
UNCOV
708
            case ejabberd_sm:get_session_sid(U, S, R) of
2✔
709
                none ->
UNCOV
710
                    case pop_stanzas_in({U, S, R}, MgmtID) of
1✔
711
                        error ->
712
                            {error, session_not_found};
×
713
                        {ok, H} ->
UNCOV
714
                            {error, session_timed_out, H}
1✔
715
                    end;
716
                {_, OldPID} = OldSID ->
UNCOV
717
                    try resume_session(OldPID, MgmtID, State) of
1✔
718
                        {resume, #{mgmt_xmlns := Xmlns,
719
                                   mgmt_queue := Queue,
720
                                   mgmt_timeout := Timeout,
721
                                   mgmt_stanzas_in := NumStanzasIn,
722
                                   mgmt_stanzas_out := NumStanzasOut} = OldState} ->
UNCOV
723
                            State1 = ejabberd_c2s:copy_state(State, OldState),
1✔
UNCOV
724
                            Queue1 = case QueueType of
1✔
725
                                         ram -> Queue;
×
UNCOV
726
                                         _ -> p1_queue:ram_to_file(Queue)
1✔
727
                                     end,
UNCOV
728
                            State2 = State1#{sid => ejabberd_sm:make_sid(),
1✔
729
                                             mgmt_id => MgmtID,
730
                                             mgmt_xmlns => Xmlns,
731
                                             mgmt_queue => Queue1,
732
                                             mgmt_timeout => Timeout,
733
                                             mgmt_stanzas_in => NumStanzasIn,
734
                                             mgmt_stanzas_out => NumStanzasOut,
735
                                             mgmt_state => active},
UNCOV
736
                            State3 = ejabberd_c2s:open_session(State2),
1✔
UNCOV
737
                            ejabberd_c2s:stop_async(OldPID),
1✔
UNCOV
738
                            {ok, State3};
1✔
739
                        {error, Msg} ->
740
                            {error, Msg}
×
741
                    catch exit:{noproc, _} ->
742
                            {error, session_is_dead};
×
743
                          exit:{normal, _} ->
744
                            {error, session_has_exited};
×
745
                          exit:{shutdown, _} ->
746
                            {error, session_has_exited};
×
747
                          exit:{killed, _} ->
748
                            {error, session_was_killed};
×
749
                          exit:{timeout, _} ->
750
                            ejabberd_sm:close_session(OldSID, U, S, R),
×
751
                            ejabberd_c2s:stop_async(OldPID),
×
752
                            {error, session_copy_timed_out}
×
753
                    end
754
            end;
755
        error ->
756
            {error, invalid_previd}
×
757
    end.
758

759
-spec resume_session(pid(), id(), state()) -> {resume, state()} |
760
                                              {error, error_reason()}.
761
resume_session(PID, MgmtID, _State) ->
UNCOV
762
    ejabberd_c2s:call(PID, {resume_session, MgmtID}, timer:seconds(15)).
1✔
763

764
-spec add_resent_delay_info(state(), stanza(), erlang:timestamp()) -> stanza();
765
                           (state(), xmlel(), erlang:timestamp()) -> xmlel().
766
add_resent_delay_info(#{lserver := LServer}, El, Time)
767
  when is_record(El, message); is_record(El, presence) ->
UNCOV
768
    misc:add_delay_info(El, jid:make(LServer), Time, <<"Resent">>);
1✔
769
add_resent_delay_info(_State, El, _Time) ->
770
    %% TODO
771
    El.
×
772

773
-spec send(state(), xmpp_element()) -> state().
774
send(#{mod := Mod} = State, Pkt) ->
UNCOV
775
    Mod:send(State, Pkt).
21✔
776

777
-spec restart_pending_timer(state(), non_neg_integer()) -> state().
778
restart_pending_timer(#{mgmt_pending_timer := TRef} = State, NewTimeout) ->
UNCOV
779
    misc:cancel_timer(TRef),
16✔
UNCOV
780
    NewTRef = erlang:start_timer(NewTimeout, self(), pending_timeout),
16✔
UNCOV
781
    State#{mgmt_pending_timer => NewTRef};
16✔
782
restart_pending_timer(State, _NewTimeout) ->
783
    State.
×
784

785
-spec start_ack_timer(state()) -> state().
786
start_ack_timer(#{mgmt_ack_timeout := infinity} = State) ->
787
    State;
×
788
start_ack_timer(#{mgmt_ack_timeout := AckTimeout} = State) ->
UNCOV
789
    TRef = erlang:start_timer(AckTimeout, self(), ack_timeout),
1✔
UNCOV
790
    State#{mgmt_ack_timer => TRef}.
1✔
791

792
-spec cancel_ack_timer(state()) -> state().
793
cancel_ack_timer(#{mgmt_ack_timer := TRef} = State) ->
794
    misc:cancel_timer(TRef),
×
795
    maps:remove(mgmt_ack_timer, State);
×
796
cancel_ack_timer(State) ->
UNCOV
797
    State.
11✔
798

799
-spec need_to_enqueue(state(), xmlel() | stanza()) -> {boolean(), state()}.
800
need_to_enqueue(State, Pkt) when ?is_stanza(Pkt) ->
UNCOV
801
    {not xmpp:get_meta(Pkt, mgmt_is_resent, false), State};
32✔
802
need_to_enqueue(#{mgmt_force_enqueue := true} = State, #xmlel{}) ->
803
    State1 = maps:remove(mgmt_force_enqueue, State),
×
804
    State2 = maps:remove(mgmt_is_resent, State1),
×
805
    {true, State2};
×
806
need_to_enqueue(State, _) ->
807
    {false, State}.
×
808

809
-spec make_id() -> id().
810
make_id() ->
UNCOV
811
    p1_rand:bytes(8).
11✔
812

813
-spec encode_id(state()) -> binary().
814
encode_id(#{mgmt_id := MgmtID, resource := Resource}) ->
UNCOV
815
    misc:term_to_base64({Resource, MgmtID}).
10✔
816

817
-spec decode_id(binary()) -> {ok, {binary(), id()}} | error.
818
decode_id(Encoded) ->
UNCOV
819
    case misc:base64_to_term(Encoded) of
2✔
820
        {term, {Resource, MgmtID}} when is_binary(Resource),
821
                                        is_binary(MgmtID) ->
UNCOV
822
            {ok, {Resource, MgmtID}};
2✔
823
        _ ->
824
            error
×
825
    end.
826

827
%%%===================================================================
828
%%% Formatters and Logging
829
%%%===================================================================
830
-spec format_error(error_reason()) -> binary().
831
format_error(session_not_found) ->
832
    ?T("Previous session not found");
×
833
format_error(session_timed_out) ->
UNCOV
834
    ?T("Previous session timed out");
2✔
835
format_error(session_is_dead) ->
836
    ?T("Previous session PID is dead");
×
837
format_error(session_has_exited) ->
838
    ?T("Previous session PID has exited");
×
839
format_error(session_was_killed) ->
840
    ?T("Previous session PID has been killed");
×
841
format_error(session_copy_timed_out) ->
842
    ?T("Session state copying timed out");
×
843
format_error(invalid_previd) ->
844
    ?T("Invalid 'previd' value").
×
845

846
-spec format_reason(state(), term()) -> binary().
847
format_reason(_, ack_timeout) ->
848
    <<"Timed out waiting for stream acknowledgement">>;
×
849
format_reason(#{stop_reason := {socket, ack_timeout}} = State, _) ->
850
    format_reason(State, ack_timeout);
×
851
format_reason(State, Reason) ->
UNCOV
852
    ejabberd_c2s:format_reason(State, Reason).
11✔
853

854
-spec log_resumption_error(binary(), binary(), error_reason()) -> ok.
855
log_resumption_error(User, Server, Reason)
856
  when Reason == invalid_previd ->
857
    ?WARNING_MSG("Cannot resume session for ~ts@~ts: ~ts",
×
858
                 [User, Server, format_error(Reason)]);
×
859
log_resumption_error(User, Server, Reason) ->
UNCOV
860
    ?INFO_MSG("Cannot resume session for ~ts@~ts: ~ts",
1✔
UNCOV
861
              [User, Server, format_error(Reason)]).
1✔
862

863
%%%===================================================================
864
%%% Cache-like storage for last handled stanzas
865
%%%===================================================================
866
init_cache(Opts) ->
867
    ets_cache:new(?STREAM_MGMT_CACHE, cache_opts(Opts)).
98✔
868

869
cache_opts(Opts) ->
870
    [{max_size, mod_stream_mgmt_opt:cache_size(Opts)},
98✔
871
     {life_time, mod_stream_mgmt_opt:cache_life_time(Opts)},
872
     {type, ordered_set}].
873

874
-spec store_stanzas_in(ljid(), id(), non_neg_integer()) -> boolean().
875
store_stanzas_in(LJID, MgmtID, Num) ->
UNCOV
876
    ets_cache:insert(?STREAM_MGMT_CACHE, {LJID, MgmtID}, Num,
9✔
877
                     ejabberd_cluster:get_nodes()).
878

879
-spec pop_stanzas_in(ljid(), id()) -> {ok, non_neg_integer()} | error.
880
pop_stanzas_in(LJID, MgmtID) ->
UNCOV
881
    case ets_cache:lookup(?STREAM_MGMT_CACHE, {LJID, MgmtID}) of
1✔
882
        {ok, Val} ->
UNCOV
883
            ets_cache:match_delete(?STREAM_MGMT_CACHE, {LJID, '_'},
1✔
884
                                   ejabberd_cluster:get_nodes()),
UNCOV
885
            {ok, Val};
1✔
886
        error ->
887
            error
×
888
    end.
889

890
%%%===================================================================
891
%%% Configuration processing
892
%%%===================================================================
893
get_max_ack_queue(Host) ->
894
    mod_stream_mgmt_opt:max_ack_queue(Host).
4,111✔
895

896
get_configured_resume_timeout(Host) ->
897
    mod_stream_mgmt_opt:resume_timeout(Host).
4,111✔
898

899
get_max_resume_timeout(Host, ResumeTimeout) ->
900
    case mod_stream_mgmt_opt:max_resume_timeout(Host) of
4,111✔
901
        undefined -> ResumeTimeout;
4,111✔
902
        Max when Max >= ResumeTimeout -> Max;
×
903
        _ -> ResumeTimeout
×
904
    end.
905

906
get_ack_timeout(Host) ->
907
    mod_stream_mgmt_opt:ack_timeout(Host).
4,111✔
908

909
get_resend_on_timeout(Host) ->
910
    mod_stream_mgmt_opt:resend_on_timeout(Host).
4,111✔
911

912
get_queue_type(Host) ->
913
    mod_stream_mgmt_opt:queue_type(Host).
4,111✔
914

915
mod_opt_type(max_ack_queue) ->
916
    econf:pos_int(infinity);
116✔
917
mod_opt_type(resume_timeout) ->
918
    econf:either(
116✔
919
      econf:int(0, 0),
920
      econf:timeout(second));
921
mod_opt_type(max_resume_timeout) ->
922
    econf:either(
116✔
923
      econf:int(0, 0),
924
      econf:timeout(second));
925
mod_opt_type(ack_timeout) ->
926
    econf:timeout(second, infinity);
116✔
927
mod_opt_type(resend_on_timeout) ->
928
    econf:either(
116✔
929
      if_offline,
930
      econf:bool());
931
mod_opt_type(cache_size) ->
932
    econf:pos_int(infinity);
116✔
933
mod_opt_type(cache_life_time) ->
934
    econf:timeout(second, infinity);
116✔
935
mod_opt_type(queue_type) ->
936
    econf:queue_type().
116✔
937

938
mod_options(Host) ->
939
    [{max_ack_queue, 5000},
116✔
940
     {resume_timeout, timer:seconds(300)},
941
     {max_resume_timeout, undefined},
942
     {ack_timeout, timer:seconds(60)},
943
     {cache_size, ejabberd_option:cache_size(Host)},
944
     {cache_life_time, timer:hours(48)},
945
     {resend_on_timeout, false},
946
     {queue_type, ejabberd_option:queue_type(Host)}].
947

948
mod_doc() ->
949
    #{desc =>
×
950
          ?T("This module adds support for "
951
             "https://xmpp.org/extensions/xep-0198.html"
952
             "[XEP-0198: Stream Management]. This protocol allows "
953
             "active management of an XML stream between two XMPP "
954
             "entities, including features for stanza acknowledgments "
955
             "and stream resumption."),
956
      opts =>
957
          [{max_ack_queue,
958
            #{value => ?T("Size"),
959
              desc =>
960
                  ?T("This option specifies the maximum number of "
961
                     "unacknowledged stanzas queued for possible "
962
                     "retransmission. When the limit is exceeded, "
963
                     "the client session is terminated. The allowed "
964
                     "values are positive integers and 'infinity'. "
965
                     "You should be careful when setting this value "
966
                     "as it should not be set too low, otherwise, "
967
                     "you could kill sessions in a loop, before they "
968
                     "get the chance to finish proper session initiation. "
969
                     "It should definitely be set higher that the size "
970
                     "of the offline queue (for example at least 3 times "
971
                     "the value of the max offline queue and never lower "
972
                     "than '1000'). The default value is '5000'.")}},
973
           {resume_timeout,
974
            #{value => "timeout()",
975
              desc =>
976
                  ?T("This option configures the (default) period of time "
977
                     "until a session times out if the connection is lost. "
978
                     "During this period of time, a client may resume its "
979
                     "session. Note that the client may request a different "
980
                     "timeout value, see the 'max_resume_timeout' option. "
981
                     "Setting it to '0' effectively disables session resumption. "
982
                     "The default value is '5' minutes.")}},
983
           {max_resume_timeout,
984
            #{value => "timeout()",
985
              desc =>
986
                  ?T("A client may specify the period of time until a session "
987
                     "times out if the connection is lost. During this period "
988
                     "of time, the client may resume its session. This option "
989
                     "limits the period of time a client is permitted to request. "
990
                     "It must be set to a timeout equal to or larger than the "
991
                     "default 'resume_timeout'. By default, it is set to the "
992
                     "same value as the 'resume_timeout' option.")}},
993
           {ack_timeout,
994
            #{value => "timeout()",
995
              desc =>
996
                  ?T("A time to wait for stanza acknowledgments. "
997
                     "Setting it to 'infinity' effectively disables the timeout. "
998
                     "The default value is '1' minute.")}},
999
           {resend_on_timeout,
1000
            #{value => "true | false | if_offline",
1001
              desc =>
1002
                  ?T("If this option is set to 'true', any message stanzas "
1003
                     "that weren't acknowledged by the client will be resent "
1004
                     "on session timeout. This behavior might often be desired, "
1005
                     "but could have unexpected results under certain circumstances. "
1006
                     "For example, a message that was sent to two resources might "
1007
                     "get resent to one of them if the other one timed out. "
1008
                     "Therefore, the default value for this option is 'false', "
1009
                     "which tells ejabberd to generate an error message instead. "
1010
                     "As an alternative, the option may be set to 'if_offline'. "
1011
                     "In this case, unacknowledged messages are resent only if "
1012
                     "no other resource is online when the session times out. "
1013
                     "Otherwise, error messages are generated.")}},
1014
           {queue_type,
1015
            #{value => "ram | file",
1016
              desc =>
1017
                  ?T("Same as top-level _`queue_type`_ option, but applied to this module only.")}},
1018
           {cache_size,
1019
            #{value => "pos_integer() | infinity",
1020
              desc =>
1021
                  ?T("Same as top-level _`cache_size`_ option, but applied to this module only.")}},
1022
           {cache_life_time,
1023
            #{value => "timeout()",
1024
              desc =>
1025
                  ?T("Same as top-level _`cache_life_time`_ option, "
1026
                     "but applied to this module only. "
1027
                     "The default value is '48 hours'.")}}]}.
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc