• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

esl / MongooseIM / 4698755488

14 Apr 2023 10:23AM UTC coverage: 81.909% (+0.4%) from 81.533%
4698755488

push

github

GitHub
Merge pull request #4002 from esl/measured-sql-requests

9 of 9 new or added lines in 1 file covered. (100.0%)

27696 of 33813 relevant lines covered (81.91%)

27956.57 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

69.57
/src/inbox/mod_inbox_rdbms_async.erl
1
-module(mod_inbox_rdbms_async).
2

3
-include("mod_inbox.hrl").
4
-include("mongoose_logger.hrl").
5

6
-behaviour(mod_inbox_backend).
7
-behaviour(mongoose_aggregator_worker).
8

9
-ifdef(gen_server_request_id).
10
-type request_id() :: gen_server:request_id().
11
-else.
12
-type request_id() :: term().
13
-endif.
14

15
-type box() :: binary().
16
-type task() ::
17
    {set_inbox, mod_inbox:entry_key(), exml:element(), pos_integer(), id(), integer(), box()} |
18
    {set_inbox_incr_unread, mod_inbox:entry_key(), exml:element(), id(), integer(), pos_integer(), box()} |
19
    {remove_inbox_row, mod_inbox:entry_key()} |
20
    {reset_unread, mod_inbox:entry_key(), id(), integer()}.
21

22
%% API
23
-export([init/2,
24
         set_inbox/6,
25
         set_inbox_incr_unread/5,
26
         reset_unread/4,
27
         remove_inbox_row/2,
28
         empty_user_bin/4,
29
         empty_domain_bin/3,
30
         empty_global_bin/2,
31
         remove_domain/2,
32
         clear_inbox/3,
33
         get_inbox/4,
34
         get_inbox_unread/2,
35
         get_full_entry/2,
36
         get_entry_properties/2,
37
         set_entry_properties/3]).
38
-export([stop/1]).
39

40
%% Worker callbacks
41
-export([request/2, aggregate/3, verify/3]).
42

43
%% Initialisation
44
-spec init(mongooseim:host_type(), gen_mod:module_opts()) -> ok.
45
init(HostType, Opts) ->
46
    AsyncOpts = prepare_pool_opts(Opts),
136✔
47
    mod_inbox_rdbms:init(HostType, Opts),
136✔
48
    prepare_deletes(HostType, Opts),
136✔
49
    start_pool(HostType, AsyncOpts),
136✔
50
    ok.
136✔
51

52
stop(HostType) ->
53
    mongoose_async_pools:stop_pool(HostType, inbox).
136✔
54

55
prepare_pool_opts(#{async_writer := AsyncOpts}) ->
56
    AsyncOpts#{pool_type => aggregate,
136✔
57
               request_callback => fun ?MODULE:request/2,
58
               aggregate_callback => fun ?MODULE:aggregate/3,
59
               verify_callback => fun ?MODULE:verify/3}.
60

61
prepare_deletes(_HostType, _Opts) ->
62
    mongoose_rdbms:prepare(inbox_move_conversation_to_bin, inbox,
136✔
63
                           [luser, lserver, remote_bare_jid],
64
                           <<"UPDATE inbox SET box='bin'",
65
                             " WHERE luser = ? AND lserver = ? AND remote_bare_jid = ?">>),
66
    ok.
136✔
67

68
-spec start_pool(mongooseim:host_type(), mongoose_async_pools:pool_opts()) -> term().
69
start_pool(HostType, Opts) ->
70
    mongoose_async_pools:start_pool(HostType, inbox, Opts).
136✔
71

72
%% Worker callbacks
73
-spec request(task(), mongoose_async_pools:pool_extra()) -> request_id().
74
request(Task, _Extra = #{host_type := HostType}) ->
75
    request_one(HostType, Task).
3,699✔
76

77
request_one(HostType, {set_inbox, {LUser, LServer, LToBareJid}, Packet, Count, MsgId, Timestamp, Box}) ->
78
    Content = exml:to_binary(Packet),
834✔
79
    Unique = [LUser, LServer, LToBareJid],
834✔
80
    Update = [MsgId, Box, Content, Count, Timestamp],
834✔
81
    Insert = [LUser, LServer, LToBareJid, MsgId, Box, Content, Count, Timestamp],
834✔
82
    rdbms_queries:request_upsert(HostType, inbox_upsert, Insert, Update, Unique);
834✔
83
request_one(HostType, {set_inbox_incr_unread, {LUser, LServer, LToBareJid}, Packet, MsgId, Timestamp, Incrs, Box}) ->
84
    Content = exml:to_binary(Packet),
1,832✔
85
    Unique = [LUser, LServer, LToBareJid],
1,832✔
86
    Update = [MsgId, Box, Content, Incrs, Timestamp],
1,832✔
87
    Insert = [LUser, LServer, LToBareJid, MsgId, Box, Content, Incrs, Timestamp],
1,832✔
88
    rdbms_queries:request_upsert(HostType, inbox_upsert_incr_unread, Insert, Update, Unique);
1,832✔
89
request_one(HostType, {reset_unread, {LUser, LServer, LToBareJid}, undefined, TS}) ->
90
    mongoose_rdbms:execute_request(HostType, inbox_reset_unread, [LUser, LServer, LToBareJid, TS]);
18✔
91
request_one(HostType, {reset_unread, {LUser, LServer, LToBareJid}, MsgId, TS}) ->
92
    mongoose_rdbms:execute_request(HostType, inbox_reset_unread_msg, [LUser, LServer, LToBareJid, MsgId, TS]);
705✔
93
request_one(HostType, {remove_inbox_row, {LUser, LServer, LToBareJid}}) ->
94
    mongoose_rdbms:execute_request(HostType, inbox_move_conversation_to_bin, [LUser, LServer, LToBareJid]).
310✔
95

96
-spec aggregate(task(), task(), mongoose_async_pools:pool_extra()) -> {ok, task()}.
97
aggregate(Current, NewTask, _Extra) ->
98
    {ok, aggregate(Current, NewTask)}.
113✔
99

100
-spec verify(term(), task(), mongoose_async_pools:pool_extra()) -> ok | {error, term()}.
101
verify(Answer, InboxTask, _Extra) ->
102
    case mod_inbox_rdbms:check_result(Answer) of
3,699✔
103
        {error, Reason} ->
104
            {LU, LS, LRem} = element(2, InboxTask),
×
105
            ?LOG_WARNING(#{what => inbox_process_message_failed, reason => Reason,
×
106
                           from_jid => jid:to_binary({LU, LS}), to_jid => LRem}),
×
107
            {error, Reason};
×
108
        _ -> ok
3,699✔
109
    end.
110

111
%% async callbacks
112
-spec set_inbox(mongooseim:host_type(), mod_inbox:entry_key(),
113
                exml:element(), Count :: integer(), id(), Timestamp :: integer()) ->
114
    mod_inbox:write_res().
115
set_inbox(HostType, Entry, Packet, Count, MsgId, Timestamp) ->
116
    Params = {set_inbox, Entry, Packet, Count, MsgId, Timestamp, <<"inbox">>},
786✔
117
    mongoose_async_pools:put_task(HostType, inbox, Entry, Params).
786✔
118

119
-spec set_inbox_incr_unread(mongooseim:host_type(), mod_inbox:entry_key(),
120
                            exml:element(), MsgId :: binary(), Timestamp :: integer()) ->
121
    mod_inbox:count_res().
122
set_inbox_incr_unread(HostType, Entry, Packet, MsgId, Timestamp) ->
123
    Params = {set_inbox_incr_unread, Entry, Packet, MsgId, Timestamp, 1, <<"inbox">>},
1,922✔
124
    mongoose_async_pools:put_task(HostType, inbox, Entry, Params).
1,922✔
125

126
-spec reset_unread(mongooseim:host_type(), mod_inbox:entry_key(), binary() | undefined, integer()) ->
127
    mod_inbox:write_res().
128
reset_unread(HostType, Entry, MsgId, TS) ->
129
    Params = {reset_unread, Entry, MsgId, TS},
792✔
130
    mongoose_async_pools:put_task(HostType, inbox, Entry, Params).
792✔
131

132
-spec remove_inbox_row(mongooseim:host_type(), mod_inbox:entry_key()) -> mod_inbox:write_res().
133
remove_inbox_row(HostType, Entry) ->
134
    Params = {remove_inbox_row, Entry},
312✔
135
    mongoose_async_pools:put_task(HostType, inbox, Entry, Params).
312✔
136

137
%% synchronous callbacks
138
-spec get_inbox(mongooseim:host_type(), jid:luser(), jid:lserver(), mod_inbox:get_inbox_params()) ->
139
    [mod_inbox:inbox_res()].
140
get_inbox(HostType, LUser, LServer, Params) ->
141
    mod_inbox_rdbms:get_inbox(HostType, LUser, LServer, Params).
4,458✔
142

143
-spec get_inbox_unread(mongooseim:host_type(), mod_inbox:entry_key()) ->
144
    {ok, integer()}.
145
get_inbox_unread(HostType, Entry) ->
146
    mod_inbox_rdbms:get_inbox_unread(HostType, Entry).
×
147

148
-spec remove_domain(mongooseim:host_type(), jid:lserver()) -> term().
149
remove_domain(HostType, LServer) ->
150
    mod_inbox_rdbms:remove_domain(HostType, LServer).
×
151

152
-spec clear_inbox(mongooseim:host_type(), jid:luser(), jid:lserver()) ->
153
    mod_inbox:write_res().
154
clear_inbox(HostType, LUser, LServer) ->
155
    mod_inbox_rdbms:clear_inbox(HostType, LUser, LServer).
2,244✔
156

157
-spec get_full_entry(mongooseim:host_type(), mod_inbox:entry_key()) ->
158
    inbox_res() | nil().
159
get_full_entry(HostType, Entry) ->
160
    mod_inbox_rdbms:get_full_entry(HostType, Entry).
6✔
161

162
-spec get_entry_properties(mongooseim:host_type(), mod_inbox:entry_key()) ->
163
    entry_properties() | nil().
164
get_entry_properties(HostType, Entry) ->
165
    mod_inbox_rdbms:get_entry_properties(HostType, Entry).
6✔
166

167
-spec set_entry_properties(mongooseim:host_type(), mod_inbox:entry_key(), entry_properties()) ->
168
    entry_properties() | {error, binary()}.
169
set_entry_properties(HostType, Entry, Properties) ->
170
    mod_inbox_rdbms:set_entry_properties(HostType, Entry, Properties).
306✔
171

172
-spec empty_user_bin(HostType :: mongooseim:host_type(),
173
                     LServer :: jid:lserver(),
174
                     LUser :: jid:luser(),
175
                     TS :: integer()) -> non_neg_integer().
176
empty_user_bin(HostType, LServer, LUser, TS) ->
177
    mod_inbox_rdbms:empty_user_bin(HostType, LServer, LUser, TS).
48✔
178

179
-spec empty_domain_bin(HostType :: mongooseim:host_type(),
180
                       LServer :: jid:lserver(),
181
                       TS :: integer()) -> non_neg_integer().
182
empty_domain_bin(HostType, LServer, TS) ->
183
    mod_inbox_rdbms:empty_domain_bin(HostType, LServer, TS).
42✔
184

185
-spec empty_global_bin(HostType :: mongooseim:host_type(),
186
                       TS :: integer()) -> non_neg_integer().
187
empty_global_bin(HostType, TS) ->
188
    mod_inbox_rdbms:empty_global_bin(HostType, TS).
112✔
189

190
-spec aggregate(CurrentlyAccumulatedTask :: task(), NewTask :: task()) -> FinalTask :: task().
191
%%% if new task is remove_row, do the previous with an updated box
192
aggregate({set_inbox, Entry, Content, Count, MsgId, Timestamp, _},
193
          {remove_inbox_row, _}) ->
194
    {set_inbox, Entry, Content, Count, MsgId, Timestamp, <<"bin">>};
×
195
aggregate({set_inbox_incr_unread, Entry, Content, MsgId, Timestamp, Incrs, _},
196
          {remove_inbox_row, _}) ->
197
    {set_inbox_incr_unread, Entry, Content, MsgId, Timestamp, Incrs, <<"bin">>};
2✔
198
aggregate(_, {remove_inbox_row, _} = NewTask) ->
199
    NewTask;
×
200

201
%%% if the last task was remove_row, this task should now only be an insert
202
aggregate({remove_inbox_row, _} = OldTask, {reset_unread, _, _, _}) ->
203
    OldTask;
×
204
aggregate({remove_inbox_row, _},
205
          {set_inbox, Entry, Content, Count, MsgId, Timestamp, _}) ->
206
    {set_inbox, Entry, Content, Count, MsgId, Timestamp, <<"bin">>};
×
207
aggregate({remove_inbox_row, _},
208
          {set_inbox_incr_unread, Entry, Content, MsgId, Timestamp, Incrs, _}) ->
209
    {set_inbox_incr_unread, Entry, Content, MsgId, Timestamp, Incrs, <<"bin">>};
×
210

211
%%% If the last task was a reset_unread,
212
%   we prefer explicit resets,
213
%   then adhoc newer resets,
214
%   then we accumulate inserts
215
%% an undefined means an explicit request to reset, it has priority
216
aggregate({reset_unread, _, _, _}, {reset_unread, _, undefined, _} = NewTask) ->
217
    NewTask;
×
218
%% an undefined means an explicit request to reset, it has priority
219
aggregate({reset_unread, _, undefined, _} = OldTask, {reset_unread, _, _, _}) ->
220
    OldTask;
×
221
%% both are adhoc, we prefer the newer
222
aggregate({reset_unread, _, _, _}, {reset_unread, _, _, _} = NewTask) ->
223
    NewTask;
×
224
aggregate({reset_unread, _, _, _}, {set_inbox, _, _, _, _, _, _} = NewTask) ->
225
    NewTask;
×
226
%% Here `Count` becomes an absolute value instead of an increment
227
aggregate({reset_unread, _, _, _},
228
          {set_inbox_incr_unread, Entry, Content, MsgId, Timestamp, Incrs, Box}) ->
229
    {set_inbox, Entry, Content, Incrs, MsgId, Timestamp, Box};
5✔
230

231
%%% If the last task was a set_inbox
232
%% Reset is an explicit reset-to-zero, so do reset the counter
233
aggregate({set_inbox, Entry, Content, _, MsgId, Timestamp, Box},
234
          {reset_unread, _, undefined, _}) ->
235
    {set_inbox, Entry, Content, 0, MsgId, Timestamp, Box};
×
236
%% Reset refers to that same set_inbox
237
aggregate({set_inbox, Entry, Content, _, MsgId, Timestamp, Box},
238
          {reset_unread, _, MsgId, _}) ->
239
    {set_inbox, Entry, Content, 0, MsgId, Timestamp, Box};
×
240
%% Reset refers to some other set_inbox
241
aggregate({set_inbox, _, _, _, _, _, _} = OldTask,
242
          {reset_unread, _, _, _}) ->
243
    OldTask;
×
244
aggregate({set_inbox, _, _, Count, _, _, _, _},
245
          {set_inbox_incr_unread, Entry, Content, MsgId, Timestamp, Incrs, Box}) ->
246
    {set_inbox_incr_unread, Entry, Content, MsgId, Timestamp, Count + Incrs, Box};
×
247

248
%%% If the last task was a set_inbox_incr_unread
249
% we're resetting on this message:
250
aggregate({set_inbox_incr_unread, Entry, Content, MsgId, Timestamp, _, Box},
251
          {reset_unread, _, MsgId, _}) ->
252
    {set_inbox, Entry, Content, 0, MsgId, Timestamp, Box};
64✔
253
aggregate({set_inbox_incr_unread, _, _, _, _, _, _} = OldTask,
254
          {reset_unread, _, _, _}) ->
255
    OldTask;
×
256
% prefer newest row, but accumulate increment
257
aggregate({set_inbox_incr_unread, _, _, _, _, Incrs2, _},
258
          {set_inbox_incr_unread, Entry, Content, MsgId, Timestamp, Incrs1, Box}) ->
259
    {set_inbox_incr_unread, Entry, Content, MsgId, Timestamp, Incrs1 + Incrs2, Box};
19✔
260

261
aggregate({set_inbox_incr_unread, _, _, MsgId, _, _, _},
262
          {set_inbox, _, _, _, MsgId, _, _} = NewTask) ->
263
    NewTask;
×
264

265
aggregate(_OldTask, NewTask) ->
266
    NewTask.
23✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc