• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

emqx / emqx / 6827041604

10 Nov 2023 04:03PM UTC coverage: 82.664% (-0.08%) from 82.742%
6827041604

push

github

web-flow
Merge pull request #11928 from thalesmg/fix-push-entrypoint-r53-20231110

ci: trigger push entrypoint for `release-5[0-9]`

35482 of 42923 relevant lines covered (82.66%)

6533.55 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.76
/apps/emqx_redis/src/emqx_redis.erl
1
%%--------------------------------------------------------------------
2
%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
3
%%
4
%% Licensed under the Apache License, Version 2.0 (the "License");
5
%% you may not use this file except in compliance with the License.
6
%% You may obtain a copy of the License at
7
%%
8
%%     http://www.apache.org/licenses/LICENSE-2.0
9
%%
10
%% Unless required by applicable law or agreed to in writing, software
11
%% distributed under the License is distributed on an "AS IS" BASIS,
12
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
%% See the License for the specific language governing permissions and
14
%% limitations under the License.
15
%%--------------------------------------------------------------------
16
-module(emqx_redis).
17

18
-include_lib("emqx_connector/include/emqx_connector.hrl").
19
-include_lib("typerefl/include/types.hrl").
20
-include_lib("hocon/include/hoconsc.hrl").
21
-include_lib("emqx/include/logger.hrl").
22

23
-export([roots/0, fields/1]).
24

25
-behaviour(emqx_resource).
26

27
%% callbacks of behaviour emqx_resource
28
-export([
29
    callback_mode/0,
30
    on_start/2,
31
    on_stop/2,
32
    on_query/3,
33
    on_get_status/2
34
]).
35

36
-export([do_get_status/1]).
37

38
-export([connect/1]).
39

40
-export([do_cmd/3]).
41

42
%% redis host don't need parse
43
-define(REDIS_HOST_OPTIONS, #{
44
    default_port => ?REDIS_DEFAULT_PORT
45
}).
46

47
%%=====================================================================
48
roots() ->
49
    [
6✔
50
        {config, #{
51
            type => hoconsc:union(
52
                [
53
                    hoconsc:ref(?MODULE, cluster),
54
                    hoconsc:ref(?MODULE, single),
55
                    hoconsc:ref(?MODULE, sentinel)
56
                ]
57
            )
58
        }}
59
    ].
60

61
fields(single) ->
62
    [
63
        {server, server()},
64
        {redis_type, #{
65
            type => single,
66
            default => single,
67
            required => false,
68
            desc => ?DESC("single")
69
        }}
70
    ] ++
667✔
71
        redis_fields() ++
72
        emqx_connector_schema_lib:ssl_fields();
73
fields(cluster) ->
74
    [
75
        {servers, servers()},
76
        {redis_type, #{
77
            type => cluster,
78
            default => cluster,
79
            required => false,
80
            desc => ?DESC("cluster")
81
        }}
82
    ] ++
564✔
83
        lists:keydelete(database, 1, redis_fields()) ++
84
        emqx_connector_schema_lib:ssl_fields();
85
fields(sentinel) ->
86
    [
87
        {servers, servers()},
88
        {redis_type, #{
89
            type => sentinel,
90
            default => sentinel,
91
            required => false,
92
            desc => ?DESC("sentinel")
93
        }},
94
        {sentinel, #{
95
            type => string(),
96
            required => true,
97
            desc => ?DESC("sentinel_desc")
98
        }}
99
    ] ++
504✔
100
        redis_fields() ++
101
        emqx_connector_schema_lib:ssl_fields().
102

103
server() ->
104
    Meta = #{desc => ?DESC("server")},
667✔
105
    emqx_schema:servers_sc(Meta, ?REDIS_HOST_OPTIONS).
667✔
106

107
servers() ->
108
    Meta = #{desc => ?DESC("servers")},
1,068✔
109
    emqx_schema:servers_sc(Meta, ?REDIS_HOST_OPTIONS).
1,068✔
110

111
%% ===================================================================
112

113
callback_mode() -> always_sync.
59✔
114

115
on_start(
116
    InstId,
117
    #{
118
        redis_type := Type,
119
        pool_size := PoolSize,
120
        ssl := SSL
121
    } = Config
122
) ->
123
    ?SLOG(info, #{
86✔
124
        msg => "starting_redis_connector",
125
        connector => InstId,
126
        config => emqx_utils:redact(Config)
127
    }),
86✔
128
    ConfKey =
86✔
129
        case Type of
130
            single -> server;
70✔
131
            _ -> servers
16✔
132
        end,
133
    Servers0 = maps:get(ConfKey, Config),
86✔
134
    Servers1 = lists:map(
86✔
135
        fun(#{hostname := Host, port := Port}) ->
136
            {Host, Port}
94✔
137
        end,
138
        emqx_schema:parse_servers(Servers0, ?REDIS_HOST_OPTIONS)
139
    ),
140
    Servers = [{servers, Servers1}],
86✔
141
    Database =
86✔
142
        case Type of
143
            cluster -> [];
6✔
144
            _ -> [{database, maps:get(database, Config)}]
80✔
145
        end,
146
    Opts =
86✔
147
        [
148
            {pool_size, PoolSize},
149
            {username, maps:get(username, Config, undefined)},
150
            {password, eredis_secret:wrap(maps:get(password, Config, ""))},
151
            {auto_reconnect, ?AUTO_RECONNECT_INTERVAL}
152
        ] ++ Database ++ Servers,
153
    Options =
86✔
154
        case maps:get(enable, SSL) of
155
            true ->
156
                [
21✔
157
                    {ssl, true},
158
                    {ssl_options, emqx_tls_lib:to_client_opts(SSL)}
159
                ];
160
            false ->
161
                [{ssl, false}]
65✔
162
        end ++ [{sentinel, maps:get(sentinel, Config, undefined)}],
163
    State = #{pool_name => InstId, type => Type},
86✔
164
    ok = emqx_resource:allocate_resource(InstId, type, Type),
86✔
165
    ok = emqx_resource:allocate_resource(InstId, pool_name, InstId),
86✔
166
    case Type of
86✔
167
        cluster ->
168
            case eredis_cluster:start_pool(InstId, Opts ++ [{options, Options}]) of
6✔
169
                {ok, _} ->
170
                    {ok, State};
6✔
171
                {ok, _, _} ->
172
                    {ok, State};
×
173
                {error, Reason} ->
174
                    {error, Reason}
×
175
            end;
176
        _ ->
177
            case emqx_resource_pool:start(InstId, ?MODULE, Opts ++ [{options, Options}]) of
80✔
178
                ok ->
179
                    {ok, State};
51✔
180
                {error, Reason} ->
181
                    {error, Reason}
29✔
182
            end
183
    end.
184

185
on_stop(InstId, _State) ->
186
    ?SLOG(info, #{
83✔
187
        msg => "stopping_redis_connector",
188
        connector => InstId
189
    }),
83✔
190
    case emqx_resource:get_allocated_resources(InstId) of
83✔
191
        #{pool_name := PoolName, type := cluster} ->
192
            eredis_cluster:stop_pool(PoolName);
6✔
193
        #{pool_name := PoolName, type := _} ->
194
            emqx_resource_pool:stop(PoolName);
74✔
195
        _ ->
196
            ok
3✔
197
    end.
198

199
on_query(InstId, {cmd, _} = Query, State) ->
200
    do_query(InstId, Query, State);
299✔
201
on_query(InstId, {cmds, _} = Query, State) ->
202
    do_query(InstId, Query, State).
25✔
203

204
do_query(InstId, Query, #{pool_name := PoolName, type := Type} = State) ->
205
    ?TRACE(
324✔
206
        "QUERY",
324✔
207
        "redis_connector_received",
208
        #{connector => InstId, query => Query, state => State}
209
    ),
210
    Result =
324✔
211
        case Type of
212
            cluster -> do_cmd(PoolName, cluster, Query);
42✔
213
            _ -> ecpool:pick_and_do(PoolName, {?MODULE, do_cmd, [Type, Query]}, no_handover)
282✔
214
        end,
215
    case Result of
324✔
216
        {error, Reason} ->
217
            ?SLOG(error, #{
19✔
218
                msg => "redis_connector_do_query_failed",
219
                connector => InstId,
220
                query => Query,
221
                reason => Reason
222
            }),
×
223
            case is_unrecoverable_error(Reason) of
19✔
224
                true ->
225
                    {error, {unrecoverable_error, Reason}};
3✔
226
                false when Reason =:= ecpool_empty ->
227
                    {error, {recoverable_error, Reason}};
×
228
                false ->
229
                    Result
16✔
230
            end;
231
        _ ->
232
            Result
305✔
233
    end.
234

235
is_unrecoverable_error(Results) when is_list(Results) ->
236
    lists:any(fun is_unrecoverable_error/1, Results);
3✔
237
is_unrecoverable_error({error, <<"ERR unknown command ", _/binary>>}) ->
238
    true;
2✔
239
is_unrecoverable_error({error, invalid_cluster_command}) ->
240
    true;
1✔
241
is_unrecoverable_error(_) ->
242
    false.
19✔
243

244
on_get_status(_InstId, #{type := cluster, pool_name := PoolName}) ->
245
    case eredis_cluster:pool_exists(PoolName) of
12✔
246
        true ->
247
            Health = eredis_cluster:ping_all(PoolName),
12✔
248
            status_result(Health);
12✔
249
        false ->
250
            disconnected
×
251
    end;
252
on_get_status(_InstId, #{pool_name := PoolName}) ->
253
    Health = emqx_resource_pool:health_check_workers(PoolName, fun ?MODULE:do_get_status/1),
74✔
254
    status_result(Health).
74✔
255

256
do_get_status(Conn) ->
257
    case eredis:q(Conn, ["PING"]) of
592✔
258
        {ok, _} -> true;
592✔
259
        _ -> false
×
260
    end.
261

262
status_result(_Status = true) -> connected;
86✔
263
status_result(_Status = false) -> connecting.
×
264

265
do_cmd(PoolName, cluster, {cmd, Command}) ->
266
    eredis_cluster:q(PoolName, Command);
40✔
267
do_cmd(Conn, _Type, {cmd, Command}) ->
268
    eredis:q(Conn, Command);
259✔
269
do_cmd(PoolName, cluster, {cmds, Commands}) ->
270
    % TODO
271
    % Cluster mode is currently incompatible with batching.
272
    wrap_qp_result([eredis_cluster:q(PoolName, Command) || Command <- Commands]);
2✔
273
do_cmd(Conn, _Type, {cmds, Commands}) ->
274
    wrap_qp_result(eredis:qp(Conn, Commands)).
23✔
275

276
wrap_qp_result({error, _} = Error) ->
277
    Error;
1✔
278
wrap_qp_result(Results) when is_list(Results) ->
279
    AreAllOK = lists:all(
24✔
280
        fun
281
            ({ok, _}) -> true;
94✔
282
            ({error, _}) -> false
3✔
283
        end,
284
        Results
285
    ),
286
    case AreAllOK of
24✔
287
        true -> {ok, Results};
21✔
288
        false -> {error, Results}
3✔
289
    end.
290

291
%% ===================================================================
292
connect(Opts) ->
293
    eredis:start_link(Opts).
437✔
294

295
redis_fields() ->
296
    [
1,735✔
297
        {pool_size, fun emqx_connector_schema_lib:pool_size/1},
298
        {username, fun emqx_connector_schema_lib:username/1},
299
        {password, fun emqx_connector_schema_lib:password/1},
300
        {database, #{
301
            type => non_neg_integer(),
302
            default => 0,
303
            desc => ?DESC("database")
304
        }},
305
        {auto_reconnect, fun emqx_connector_schema_lib:auto_reconnect/1}
306
    ].
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc