• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

processone / ejabberd / 603

17 Oct 2023 01:57PM UTC coverage: 32.654% (-0.4%) from 33.021%
603

push

github

badlop
Fixing minor typos in CHANGELOG

13497 of 41333 relevant lines covered (32.65%)

646.75 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/src/mod_mqtt_sql.erl
1
%%%-------------------------------------------------------------------
2
%%% @author Evgeny Khramtsov <ekhramtsov@process-one.net>
3
%%% @copyright (C) 2002-2023 ProcessOne, SARL. All Rights Reserved.
4
%%%
5
%%% Licensed under the Apache License, Version 2.0 (the "License");
6
%%% you may not use this file except in compliance with the License.
7
%%% You may obtain a copy of the License at
8
%%%
9
%%%     http://www.apache.org/licenses/LICENSE-2.0
10
%%%
11
%%% Unless required by applicable law or agreed to in writing, software
12
%%% distributed under the License is distributed on an "AS IS" BASIS,
13
%%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
%%% See the License for the specific language governing permissions and
15
%%% limitations under the License.
16
%%%
17
%%%-------------------------------------------------------------------
18
-module(mod_mqtt_sql).
19
-behaviour(mod_mqtt).
20

21
%% API
22
-export([init/2, publish/6, delete_published/2, lookup_published/2]).
23
-export([list_topics/1]).
24
%% Unsupported backend API
25
-export([init/0]).
26
-export([subscribe/4, unsubscribe/2, find_subscriber/2]).
27
-export([open_session/1, close_session/1, lookup_session/1, get_sessions/2]).
28

29
-include("logger.hrl").
30
-include("ejabberd_sql_pt.hrl").
31

32
%%%===================================================================
33
%%% API
34
%%%===================================================================
35
init() ->
36
    ?ERROR_MSG("Backend 'sql' is only supported for db_type", []),
×
37
    {error, db_failure}.
×
38

39
init(Host, _Opts) ->
40
    ejabberd_sql_schema:update_schema(Host, ?MODULE, schemas()),
×
41
    ok.
×
42

43
schemas() ->
44
    [#sql_schema{
×
45
        version = 1,
46
        tables =
47
            [#sql_table{
48
                name = <<"mqtt_pub">>,
49
                columns =
50
                    [#sql_column{name = <<"username">>, type = text},
51
                     #sql_column{name = <<"server_host">>, type = text},
52
                     #sql_column{name = <<"resource">>, type = text},
53
                     #sql_column{name = <<"topic">>, type = text},
54
                     #sql_column{name = <<"qos">>, type = smallint},
55
                     #sql_column{name = <<"payload">>, type = blob},
56
                     #sql_column{name = <<"payload_format">>, type = smallint},
57
                     #sql_column{name = <<"content_type">>, type = text},
58
                     #sql_column{name = <<"response_topic">>, type = text},
59
                     #sql_column{name = <<"correlation_data">>, type = blob},
60
                     #sql_column{name = <<"user_property">>, type = blob},
61
                     #sql_column{name = <<"expiry">>, type = bigint}],
62
                indices = [#sql_index{
63
                              columns = [<<"topic">>, <<"server_host">>],
64
                              unique = true}]}]}].
65

66
publish({U, LServer, R}, Topic, Payload, QoS, Props, ExpiryTime) ->
67
    PayloadFormat = encode_pfi(maps:get(payload_format_indicator, Props, binary)),
×
68
    ResponseTopic = maps:get(response_topic, Props, <<"">>),
×
69
    CorrelationData = maps:get(correlation_data, Props, <<"">>),
×
70
    ContentType = maps:get(content_type, Props, <<"">>),
×
71
    UserProps = encode_props(maps:get(user_property, Props, [])),
×
72
    case ?SQL_UPSERT(LServer, "mqtt_pub",
×
73
                     ["!topic=%(Topic)s",
74
                      "!server_host=%(LServer)s",
75
                      "username=%(U)s",
76
                      "resource=%(R)s",
77
                      "payload=%(Payload)s",
78
                      "qos=%(QoS)d",
79
                      "payload_format=%(PayloadFormat)d",
80
                      "response_topic=%(ResponseTopic)s",
81
                      "correlation_data=%(CorrelationData)s",
82
                      "content_type=%(ContentType)s",
83
                      "user_properties=%(UserProps)s",
84
                      "expiry=%(ExpiryTime)d"]) of
85
        ok -> ok;
×
86
        _Err -> {error, db_failure}
×
87
    end.
88

89
delete_published({_, LServer, _}, Topic) ->
90
    case ejabberd_sql:sql_query(
×
91
           LServer,
92
           ?SQL("delete from mqtt_pub where "
×
93
                "topic=%(Topic)s and %(LServer)H")) of
94
        {updated, _} -> ok;
×
95
        _Err -> {error, db_failure}
×
96
    end.
97

98
lookup_published({_, LServer, _}, Topic) ->
99
    case ejabberd_sql:sql_query(
×
100
           LServer,
101
           ?SQL("select @(payload)s, @(qos)d, @(payload_format)d, "
×
102
                "@(content_type)s, @(response_topic)s, "
103
                "@(correlation_data)s, @(user_properties)s, @(expiry)d "
104
                "from mqtt_pub where topic=%(Topic)s and %(LServer)H")) of
105
        {selected, [{Payload, QoS, PayloadFormat, ContentType,
106
                     ResponseTopic, CorrelationData, EncProps, Expiry}]} ->
107
            try decode_props(EncProps) of
×
108
                UserProps ->
109
                    try decode_pfi(PayloadFormat) of
×
110
                        PFI ->
111
                            Props = #{payload_format_indicator => PFI,
×
112
                                      content_type => ContentType,
113
                                      response_topic => ResponseTopic,
114
                                      correlation_data => CorrelationData,
115
                                      user_property => UserProps},
116
                            {ok, {Payload, QoS, Props, Expiry}}
×
117
                    catch _:badarg ->
118
                            ?ERROR_MSG("Malformed value of 'payload_format' column "
×
119
                                       "for topic '~ts'", [Topic]),
×
120
                            {error, db_failure}
×
121
                    end
122
            catch _:badarg ->
123
                    ?ERROR_MSG("Malformed value of 'user_properties' column "
×
124
                               "for topic '~ts'", [Topic]),
×
125
                    {error, db_failure}
×
126
            end;
127
        {selected, []} ->
128
            {error, notfound};
×
129
        _ ->
130
            {error, db_failure}
×
131
    end.
132

133
list_topics(LServer) ->
134
    case ejabberd_sql:sql_query(
×
135
           LServer,
136
           ?SQL("select @(topic)s from mqtt_pub where %(LServer)H")) of
×
137
        {selected, Res} ->
138
            {ok, [Topic || {Topic} <- Res]};
×
139
        _ ->
140
            {error, db_failure}
×
141
    end.
142

143
open_session(_) ->
144
    erlang:nif_error(unsupported_db).
×
145

146
close_session(_) ->
147
    erlang:nif_error(unsupported_db).
×
148

149
lookup_session(_) ->
150
    erlang:nif_error(unsupported_db).
×
151

152
get_sessions(_, _) ->
153
    erlang:nif_error(unsupported_db).
×
154

155
subscribe(_, _, _, _) ->
156
    erlang:nif_error(unsupported_db).
×
157

158
unsubscribe(_, _) ->
159
    erlang:nif_error(unsupported_db).
×
160

161
find_subscriber(_, _) ->
162
    erlang:nif_error(unsupported_db).
×
163

164
%%%===================================================================
165
%%% Internal functions
166
%%%===================================================================
167
encode_pfi(binary) -> 0;
×
168
encode_pfi(utf8) -> 1.
×
169

170
decode_pfi(0) -> binary;
×
171
decode_pfi(1) -> utf8.
×
172

173
encode_props([]) -> <<"">>;
×
174
encode_props(L) -> term_to_binary(L).
×
175

176
decode_props(<<"">>) -> [];
×
177
decode_props(Bin) -> binary_to_term(Bin).
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc