• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

processone / ejabberd / 1173

28 Oct 2025 11:02AM UTC coverage: 33.768% (-0.02%) from 33.79%
1173

push

github

badlop
CHANGELOG.md: Update to 25.10

15513 of 45940 relevant lines covered (33.77%)

1078.01 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

12.0
/src/mod_mqtt_sql.erl
1
%%%-------------------------------------------------------------------
2
%%% @author Evgeny Khramtsov <ekhramtsov@process-one.net>
3
%%% @copyright (C) 2002-2025 ProcessOne, SARL. All Rights Reserved.
4
%%%
5
%%% Licensed under the Apache License, Version 2.0 (the "License");
6
%%% you may not use this file except in compliance with the License.
7
%%% You may obtain a copy of the License at
8
%%%
9
%%%     http://www.apache.org/licenses/LICENSE-2.0
10
%%%
11
%%% Unless required by applicable law or agreed to in writing, software
12
%%% distributed under the License is distributed on an "AS IS" BASIS,
13
%%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
%%% See the License for the specific language governing permissions and
15
%%% limitations under the License.
16
%%%
17
%%%-------------------------------------------------------------------
18
-module(mod_mqtt_sql).
19
-behaviour(mod_mqtt).
20

21
%% API
22
-export([init/2, publish/6, delete_published/2, lookup_published/2]).
23
-export([list_topics/1]).
24
%% Unsupported backend API
25
-export([init/0]).
26
-export([subscribe/4, unsubscribe/2, find_subscriber/2]).
27
-export([open_session/1, close_session/1, lookup_session/1, get_sessions/2]).
28
-export([sql_schemas/0]).
29

30
-include("logger.hrl").
31
-include("ejabberd_sql_pt.hrl").
32

33
%%%===================================================================
34
%%% API
35
%%%===================================================================
36
init() ->
37
    ?ERROR_MSG("Backend 'sql' is only supported for db_type", []),
×
38
    {error, db_failure}.
×
39

40
init(Host, _Opts) ->
41
    ejabberd_sql_schema:update_schema(Host, ?MODULE, sql_schemas()),
4✔
42
    ok.
4✔
43

44
sql_schemas() ->
45
    [#sql_schema{
4✔
46
        version = 1,
47
        tables =
48
            [#sql_table{
49
                name = <<"mqtt_pub">>,
50
                columns =
51
                    [#sql_column{name = <<"username">>, type = text},
52
                     #sql_column{name = <<"server_host">>, type = text},
53
                     #sql_column{name = <<"resource">>, type = text},
54
                     #sql_column{name = <<"topic">>, type = text},
55
                     #sql_column{name = <<"qos">>, type = smallint},
56
                     #sql_column{name = <<"payload">>, type = blob},
57
                     #sql_column{name = <<"payload_format">>, type = smallint},
58
                     #sql_column{name = <<"content_type">>, type = text},
59
                     #sql_column{name = <<"response_topic">>, type = text},
60
                     #sql_column{name = <<"correlation_data">>, type = blob},
61
                     #sql_column{name = <<"user_properties">>, type = blob},
62
                     #sql_column{name = <<"expiry">>, type = bigint}],
63
                indices = [#sql_index{
64
                              columns = [<<"topic">>, <<"server_host">>],
65
                              unique = true}]}]}].
66

67
publish({U, LServer, R}, Topic, Payload, QoS, Props, ExpiryTime) ->
68
    PayloadFormat = encode_pfi(maps:get(payload_format_indicator, Props, binary)),
×
69
    ResponseTopic = maps:get(response_topic, Props, <<"">>),
×
70
    CorrelationData = maps:get(correlation_data, Props, <<"">>),
×
71
    ContentType = maps:get(content_type, Props, <<"">>),
×
72
    UserProps = encode_props(maps:get(user_property, Props, [])),
×
73
    case ?SQL_UPSERT(LServer, "mqtt_pub",
×
74
                     ["!topic=%(Topic)s",
75
                      "!server_host=%(LServer)s",
76
                      "username=%(U)s",
77
                      "resource=%(R)s",
78
                      "payload=%(Payload)s",
79
                      "qos=%(QoS)d",
80
                      "payload_format=%(PayloadFormat)d",
81
                      "response_topic=%(ResponseTopic)s",
82
                      "correlation_data=%(CorrelationData)s",
83
                      "content_type=%(ContentType)s",
84
                      "user_properties=%(UserProps)s",
85
                      "expiry=%(ExpiryTime)d"]) of
86
        ok -> ok;
×
87
        _Err -> {error, db_failure}
×
88
    end.
89

90
delete_published({_, LServer, _}, Topic) ->
91
    case ejabberd_sql:sql_query(
×
92
           LServer,
93
           ?SQL("delete from mqtt_pub where "
×
94
                "topic=%(Topic)s and %(LServer)H")) of
95
        {updated, _} -> ok;
×
96
        _Err -> {error, db_failure}
×
97
    end.
98

99
lookup_published({_, LServer, _}, Topic) ->
100
    case ejabberd_sql:sql_query(
×
101
           LServer,
102
           ?SQL("select @(payload)s, @(qos)d, @(payload_format)d, "
×
103
                "@(content_type)s, @(response_topic)s, "
104
                "@(correlation_data)s, @(user_properties)s, @(expiry)d "
105
                "from mqtt_pub where topic=%(Topic)s and %(LServer)H")) of
106
        {selected, [{Payload, QoS, PayloadFormat, ContentType,
107
                     ResponseTopic, CorrelationData, EncProps, Expiry}]} ->
108
            try decode_props(EncProps) of
×
109
                UserProps ->
110
                    try decode_pfi(PayloadFormat) of
×
111
                        PFI ->
112
                            Props = #{payload_format_indicator => PFI,
×
113
                                      content_type => ContentType,
114
                                      response_topic => ResponseTopic,
115
                                      correlation_data => CorrelationData,
116
                                      user_property => UserProps},
117
                            {ok, {Payload, QoS, Props, Expiry}}
×
118
                    catch _:badarg ->
119
                            ?ERROR_MSG("Malformed value of 'payload_format' column "
×
120
                                       "for topic '~ts'", [Topic]),
×
121
                            {error, db_failure}
×
122
                    end
123
            catch _:badarg ->
124
                    ?ERROR_MSG("Malformed value of 'user_properties' column "
×
125
                               "for topic '~ts'", [Topic]),
×
126
                    {error, db_failure}
×
127
            end;
128
        {selected, []} ->
129
            {error, notfound};
×
130
        _ ->
131
            {error, db_failure}
×
132
    end.
133

134
list_topics(LServer) ->
135
    case ejabberd_sql:sql_query(
4✔
136
           LServer,
137
           ?SQL("select @(topic)s from mqtt_pub where %(LServer)H")) of
4✔
138
        {selected, Res} ->
139
            {ok, [Topic || {Topic} <- Res]};
4✔
140
        _ ->
141
            {error, db_failure}
×
142
    end.
143

144
open_session(_) ->
145
    erlang:nif_error(unsupported_db).
×
146

147
close_session(_) ->
148
    erlang:nif_error(unsupported_db).
×
149

150
lookup_session(_) ->
151
    erlang:nif_error(unsupported_db).
×
152

153
get_sessions(_, _) ->
154
    erlang:nif_error(unsupported_db).
×
155

156
subscribe(_, _, _, _) ->
157
    erlang:nif_error(unsupported_db).
×
158

159
unsubscribe(_, _) ->
160
    erlang:nif_error(unsupported_db).
×
161

162
find_subscriber(_, _) ->
163
    erlang:nif_error(unsupported_db).
×
164

165
%%%===================================================================
166
%%% Internal functions
167
%%%===================================================================
168
encode_pfi(binary) -> 0;
×
169
encode_pfi(utf8) -> 1.
×
170

171
decode_pfi(0) -> binary;
×
172
decode_pfi(1) -> utf8.
×
173

174
encode_props([]) -> <<"">>;
×
175
encode_props(L) -> term_to_binary(L).
×
176

177
decode_props(<<"">>) -> [];
×
178
decode_props(Bin) -> binary_to_term(Bin).
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc