• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

processone / ejabberd / 1258

12 Dec 2025 03:57PM UTC coverage: 33.638% (-0.006%) from 33.644%
1258

push

github

badlop
Container: Apply commit a22c88a

ejabberdctl.template: Show meaningful error when ERL_DIST_PORT is in use

15554 of 46240 relevant lines covered (33.64%)

1078.28 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

45.67
/src/mod_mam_mnesia.erl
1
%%%-------------------------------------------------------------------
2
%%% File    : mod_mam_mnesia.erl
3
%%% Author  : Evgeny Khramtsov <ekhramtsov@process-one.net>
4
%%% Created : 15 Apr 2016 by Evgeny Khramtsov <ekhramtsov@process-one.net>
5
%%%
6
%%%
7
%%% ejabberd, Copyright (C) 2002-2025   ProcessOne
8
%%%
9
%%% This program is free software; you can redistribute it and/or
10
%%% modify it under the terms of the GNU General Public License as
11
%%% published by the Free Software Foundation; either version 2 of the
12
%%% License, or (at your option) any later version.
13
%%%
14
%%% This program is distributed in the hope that it will be useful,
15
%%% but WITHOUT ANY WARRANTY; without even the implied warranty of
16
%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17
%%% General Public License for more details.
18
%%%
19
%%% You should have received a copy of the GNU General Public License along
20
%%% with this program; if not, write to the Free Software Foundation, Inc.,
21
%%% 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
22
%%%
23
%%%----------------------------------------------------------------------
24

25
-module(mod_mam_mnesia).
26

27
-behaviour(mod_mam).
28

29
%% API
30
-export([init/2, remove_user/2, remove_room/3, delete_old_messages/3,
31
         extended_fields/1, store/10, write_prefs/4, get_prefs/2, select/6,
32
         remove_from_archive/3,
33
         is_empty_for_user/2, is_empty_for_room/3, delete_old_messages_batch/5,
34
         transform/1]).
35

36
-include_lib("stdlib/include/ms_transform.hrl").
37
-include_lib("xmpp/include/xmpp.hrl").
38
-include("logger.hrl").
39
-include("mod_mam.hrl").
40

41
-define(BIN_GREATER_THAN(A, B),
42
        ((A > B andalso byte_size(A) == byte_size(B))
43
         orelse byte_size(A) > byte_size(B))).
44
-define(BIN_LESS_THAN(A, B),
45
        ((A < B andalso byte_size(A) == byte_size(B))
46
         orelse byte_size(A) < byte_size(B))).
47

48
-define(TABLE_SIZE_LIMIT, 2000000000). % A bit less than 2 GiB.
49

50
%%%===================================================================
51
%%% API
52
%%%===================================================================
53
init(_Host, _Opts) ->
54
    try
103✔
55
        {atomic, _} = ejabberd_mnesia:create(
103✔
56
                        ?MODULE, archive_msg,
57
                        [{disc_only_copies, [node()]},
58
                         {type, bag},
59
                         {attributes, record_info(fields, archive_msg)}]),
60
        {atomic, _} = ejabberd_mnesia:create(
103✔
61
                        ?MODULE, archive_prefs,
62
                        [{disc_only_copies, [node()]},
63
                         {attributes, record_info(fields, archive_prefs)}]),
64
        ok
103✔
65
    catch _:{badmatch, _} ->
66
            {error, db_failure}
×
67
    end.
68

69
remove_user(LUser, LServer) ->
70
    US = {LUser, LServer},
283✔
71
    F = fun () ->
283✔
72
                mnesia:delete({archive_msg, US}),
283✔
73
                mnesia:delete({archive_prefs, US})
283✔
74
        end,
75
    mnesia:transaction(F).
283✔
76

77
remove_room(_LServer, LName, LHost) ->
78
    remove_user(LName, LHost).
222✔
79

80
remove_from_archive(LUser, LHost, Key) when is_binary(LUser) ->
81
    remove_from_archive({LUser, LHost}, LHost, Key);
×
82
remove_from_archive(US, _LServer, none) ->
83
    case mnesia:transaction(fun () -> mnesia:delete({archive_msg, US}) end) of
×
84
        {atomic, _} -> ok;
×
85
        {aborted, Reason} -> {error, Reason}
×
86
    end;
87
remove_from_archive(US, _LServer, #jid{} = WithJid) ->
88
    Peer = jid:remove_resource(jid:split(WithJid)),
×
89
    F = fun () ->
×
90
            Msgs = mnesia:select(
×
91
                     archive_msg,
92
                     ets:fun2ms(
93
                       fun(#archive_msg{us = US1, bare_peer = Peer1} = Msg)
94
                          when US1 == US, Peer1 == Peer -> Msg
95
                       end)),
96
            lists:foreach(fun mnesia:delete_object/1, Msgs)
×
97
        end,
98
    case mnesia:transaction(F) of
×
99
        {atomic, _} -> ok;
×
100
        {aborted, Reason} -> {error, Reason}
×
101
    end;
102
remove_from_archive(US, _LServer, StanzaId) ->
103
    Timestamp = misc:usec_to_now(StanzaId),
×
104
    F = fun () ->
×
105
        Msgs = mnesia:select(
×
106
            archive_msg,
107
            ets:fun2ms(
108
                fun(#archive_msg{us = US1, timestamp = Timestamp1} = Msg)
109
                       when US1 == US, Timestamp1 == Timestamp -> Msg
110
                end)),
111
        lists:foreach(fun mnesia:delete_object/1, Msgs)
×
112
        end,
113
    case mnesia:transaction(F) of
×
114
        {atomic, _} -> ok;
×
115
        {aborted, Reason} -> {error, Reason}
×
116
    end.
117

118
delete_old_messages(global, TimeStamp, Type) ->
119
    mnesia:change_table_copy_type(archive_msg, node(), disc_copies),
×
120
    Result = delete_old_user_messages(mnesia:dirty_first(archive_msg), TimeStamp, Type),
×
121
    mnesia:change_table_copy_type(archive_msg, node(), disc_only_copies),
×
122
    Result.
×
123

124
delete_old_user_messages('$end_of_table', _TimeStamp, _Type) ->
125
    ok;
×
126
delete_old_user_messages(User, TimeStamp, Type) ->
127
    F = fun() ->
×
128
                Msgs = mnesia:read(archive_msg, User),
×
129
                Keep = lists:filter(
×
130
                         fun(#archive_msg{timestamp = MsgTS,
131
                                          type = MsgType}) ->
132
                                 MsgTS >= TimeStamp orelse (Type /= all andalso
×
133
                                                            Type /= MsgType)
×
134
                         end, Msgs),
135
                if length(Keep) < length(Msgs) ->
×
136
                        mnesia:delete({archive_msg, User}),
×
137
                        lists:foreach(fun(Msg) -> mnesia:write(Msg) end, Keep);
×
138
                   true ->
139
                        ok
×
140
                end
141
        end,
142
    NextRecord = mnesia:dirty_next(archive_msg, User),
×
143
    case mnesia:transaction(F) of
×
144
        {atomic, ok} ->
145
            delete_old_user_messages(NextRecord, TimeStamp, Type);
×
146
        {aborted, Err} ->
147
            ?ERROR_MSG("Cannot delete old MAM messages: ~ts", [Err]),
×
148
            Err
×
149
    end.
150

151
delete_batch('$end_of_table', _LServer, _TS, _Type, Num) ->
152
    {Num, '$end_of_table'};
×
153
delete_batch(LastUS, _LServer, _TS, _Type, 0) ->
154
    {0, LastUS};
×
155
delete_batch(none, LServer, TS, Type, Num) ->
156
    delete_batch(mnesia:first(archive_msg), LServer, TS, Type, Num);
×
157
delete_batch({_, LServer2} = LastUS, LServer, TS, Type, Num) when LServer /= LServer2 ->
158
    delete_batch(mnesia:next(archive_msg, LastUS), LServer, TS, Type, Num);
×
159
delete_batch(LastUS, LServer, TS, Type, Num) ->
160
    Left =
×
161
    lists:foldl(
162
        fun(_, 0) ->
163
            0;
×
164
           (#archive_msg{timestamp = TS2, type = Type2} = O, Num2) when TS2 < TS, (Type == all orelse Type == Type2) ->
165
               mnesia:delete_object(O),
×
166
               Num2 - 1;
×
167
           (_, Num2) ->
168
               Num2
×
169
        end, Num, mnesia:wread({archive_msg, LastUS})),
170
    case Left of
×
171
        0 -> {0, LastUS};
×
172
        _ -> delete_batch(mnesia:next(archive_msg, LastUS), LServer, TS, Type, Left)
×
173
    end.
174

175
delete_old_messages_batch(LServer, TimeStamp, Type, Batch, LastUS) ->
176
    R = mnesia:transaction(
×
177
        fun() ->
178
            {Num, NextUS} = delete_batch(LastUS, LServer, TimeStamp, Type, Batch),
×
179
            {Batch - Num, NextUS}
×
180
        end),
181
    case R of
×
182
        {atomic, {Num, State}} ->
183
            {ok, State, Num};
×
184
        {aborted, Err} ->
185
            {error, Err}
×
186
    end.
187

188
extended_fields(_) ->
189
    [].
12✔
190

191
store(Pkt, _, {LUser, LServer}, Type, Peer, Nick, _Dir, TS,
192
      OriginID, Retract) ->
193
    case Retract of
548✔
194
        {true, RID} ->
195
            mnesia:transaction(
22✔
196
              fun () ->
197
                      {PUser, PServer, _} = jid:tolower(Peer),
22✔
198
                      Msgs = mnesia:select(
22✔
199
                               archive_msg,
200
                               ets:fun2ms(
201
                                 fun(#archive_msg{
202
                                        us = US1,
203
                                        bare_peer = Peer1,
204
                                        origin_id = OriginID1} = Msg)
205
                                       when US1 == {LUser, LServer},
206
                                            Peer1 == {PUser, PServer, <<>>},
207
                                            OriginID1 == RID -> Msg
208
                                 end)),
209
                      lists:foreach(fun mnesia:delete_object/1, Msgs)
22✔
210
              end);
211
        false -> ok
526✔
212
    end,
213
    case {mnesia:table_info(archive_msg, disc_only_copies),
548✔
214
          mnesia:table_info(archive_msg, memory)} of
215
        {[_|_], TableSize} when TableSize > ?TABLE_SIZE_LIMIT ->
216
            ?ERROR_MSG("MAM archives too large, won't store message for ~ts@~ts",
×
217
                       [LUser, LServer]),
×
218
            {error, overflow};
×
219
        _ ->
220
            LPeer = {PUser, PServer, _} = jid:tolower(Peer),
548✔
221
            F = fun() ->
548✔
222
                        mnesia:write(
558✔
223
                          #archive_msg{us = {LUser, LServer},
224
                                       id = integer_to_binary(TS),
225
                                       timestamp = misc:usec_to_now(TS),
226
                                       peer = LPeer,
227
                                       bare_peer = {PUser, PServer, <<>>},
228
                                       type = Type,
229
                                       nick = Nick,
230
                                       packet = Pkt,
231
                                       origin_id = OriginID})
232
                end,
233
            case mnesia:transaction(F) of
548✔
234
                {atomic, ok} ->
235
                    ok;
548✔
236
                {aborted, Err} ->
237
                    ?ERROR_MSG("Cannot add message to MAM archive of ~ts@~ts: ~ts",
×
238
                               [LUser, LServer, Err]),
×
239
                    Err
×
240
            end
241
    end.
242

243
write_prefs(_LUser, _LServer, Prefs, _ServerHost) ->
244
    mnesia:dirty_write(Prefs).
322✔
245

246
get_prefs(LUser, LServer) ->
247
    case mnesia:dirty_read(archive_prefs, {LUser, LServer}) of
243✔
248
        [Prefs] ->
249
            {ok, Prefs};
226✔
250
        _ ->
251
            error
17✔
252
    end.
253

254
select(_LServer, JidRequestor,
255
       #jid{luser = LUser, lserver = LServer} = JidArchive,
256
       Query, RSM, MsgType) ->
257
    Start = proplists:get_value(start, Query),
404✔
258
    End = proplists:get_value('end', Query),
404✔
259
    With = proplists:get_value(with, Query),
404✔
260
    LWith = if With /= undefined -> jid:tolower(With);
404✔
261
               true -> undefined
372✔
262
            end,
263
    MS = make_matchspec(LUser, LServer, Start, End, LWith),
404✔
264
    Msgs = mnesia:dirty_select(archive_msg, MS),
404✔
265
    SortedMsgs = lists:keysort(#archive_msg.timestamp, Msgs),
404✔
266
    {FilteredMsgs, IsComplete} = filter_by_rsm(SortedMsgs, RSM),
404✔
267
    Count = length(Msgs),
404✔
268
    Result = {lists:flatmap(
404✔
269
                fun(Msg) ->
270
                        case mod_mam:msg_to_el(
1,486✔
271
                               Msg, MsgType, JidRequestor, JidArchive) of
272
                            {ok, El} ->
273
                                [{Msg#archive_msg.id,
1,486✔
274
                                  binary_to_integer(Msg#archive_msg.id),
275
                                  El}];
276
                            {error, _} ->
277
                                []
×
278
                        end
279
                end, FilteredMsgs), IsComplete, Count},
280
    erlang:garbage_collect(),
404✔
281
    Result.
404✔
282

283
is_empty_for_user(LUser, LServer) ->
284
    mnesia:dirty_read(archive_msg, {LUser, LServer}) == [].
×
285

286
is_empty_for_room(_LServer, LName, LHost) ->
287
    is_empty_for_user(LName, LHost).
×
288

289
%%%===================================================================
290
%%% Internal functions
291
%%%===================================================================
292
make_matchspec(LUser, LServer, Start, undefined, With) ->
293
    %% List is always greater than a tuple
294
    make_matchspec(LUser, LServer, Start, [], With);
404✔
295
make_matchspec(LUser, LServer, Start, End, {_, _, <<>>} = With) ->
296
    ets:fun2ms(
297
      fun(#archive_msg{timestamp = TS,
16✔
298
                       us = US,
299
                       bare_peer = BPeer} = Msg)
300
            when Start =< TS, End >= TS,
301
                 US == {LUser, LServer},
302
                 BPeer == With ->
303
              Msg
304
      end);
305
make_matchspec(LUser, LServer, Start, End, {_, _, _} = With) ->
306
    ets:fun2ms(
307
      fun(#archive_msg{timestamp = TS,
16✔
308
                       us = US,
309
                       peer = Peer} = Msg)
310
            when Start =< TS, End >= TS,
311
                 US == {LUser, LServer},
312
                 Peer == With ->
313
              Msg
314
      end);
315
make_matchspec(LUser, LServer, Start, End, undefined) ->
316
    ets:fun2ms(
317
      fun(#archive_msg{timestamp = TS,
372✔
318
                       us = US,
319
                       peer = Peer} = Msg)
320
            when Start =< TS, End >= TS,
321
                 US == {LUser, LServer} ->
322
              Msg
323
      end).
324

325
filter_by_rsm(Msgs, undefined) ->
326
    {Msgs, true};
12✔
327
filter_by_rsm(_Msgs, #rsm_set{max = Max}) when Max < 0 ->
328
    {[], true};
×
329
filter_by_rsm(Msgs, #rsm_set{max = Max, before = Before, 'after' = After}) ->
330
    NewMsgs = if is_binary(After), After /= <<"">> ->
392✔
331
                      lists:filter(
80✔
332
                        fun(#archive_msg{id = I}) ->
333
                                ?BIN_GREATER_THAN(I, After)
400✔
334
                        end, Msgs);
335
                 is_binary(Before), Before /= <<"">> ->
336
                      lists:foldl(
86✔
337
                        fun(#archive_msg{id = I} = Msg, Acc)
338
                                when ?BIN_LESS_THAN(I, Before) ->
339
                                [Msg|Acc];
520✔
340
                           (_, Acc) ->
341
                                Acc
240✔
342
                        end, [], Msgs);
343
                 is_binary(Before), Before == <<"">> ->
344
                      lists:reverse(Msgs);
32✔
345
                 true ->
346
                      Msgs
194✔
347
              end,
348
    filter_by_max(NewMsgs, Max).
392✔
349

350
filter_by_max(Msgs, undefined) ->
351
    {Msgs, true};
66✔
352
filter_by_max(Msgs, Len) when is_integer(Len), Len >= 0 ->
353
    {lists:sublist(Msgs, Len), length(Msgs) =< Len};
326✔
354
filter_by_max(_Msgs, _Junk) ->
355
    {[], true}.
×
356

357
transform({archive_msg, US, ID, Timestamp, Peer, BarePeer,
358
           Packet, Nick, Type}) ->
359
    #archive_msg{
×
360
       us = US,
361
       id = ID,
362
       timestamp = Timestamp,
363
       peer = Peer,
364
       bare_peer = BarePeer,
365
       packet = Packet,
366
       nick = Nick,
367
       type = Type,
368
       origin_id = <<"">>};
369
transform(Other) ->
370
    Other.
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc