• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

emqx / emqx / 12653506733

07 Jan 2025 02:43PM UTC coverage: 82.006%. First build
12653506733

Pull #14494

github

web-flow
Merge 3485f4ef4 into 6e3b173ca
Pull Request #14494: feat(authz_mongo): support complex selectors

23 of 24 new or added lines in 3 files covered. (95.83%)

56775 of 69233 relevant lines covered (82.01%)

15122.51 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.39
/apps/emqx_mongodb/src/emqx_mongodb.erl
1
%%--------------------------------------------------------------------
2
%% Copyright (c) 2020-2025 EMQ Technologies Co., Ltd. All Rights Reserved.
3
%%
4
%% Licensed under the Apache License, Version 2.0 (the "License");
5
%% you may not use this file except in compliance with the License.
6
%% You may obtain a copy of the License at
7
%%
8
%%     http://www.apache.org/licenses/LICENSE-2.0
9
%%
10
%% Unless required by applicable law or agreed to in writing, software
11
%% distributed under the License is distributed on an "AS IS" BASIS,
12
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
%% See the License for the specific language governing permissions and
14
%% limitations under the License.
15
%%--------------------------------------------------------------------
16
-module(emqx_mongodb).
17

18
-include_lib("emqx_resource/include/emqx_resource.hrl").
19
-include_lib("emqx_connector/include/emqx_connector.hrl").
20
-include_lib("typerefl/include/types.hrl").
21
-include_lib("hocon/include/hoconsc.hrl").
22
-include_lib("emqx/include/logger.hrl").
23
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
24

25
-behaviour(emqx_resource).
26
-behaviour(hocon_schema).
27

28
%% callbacks of behaviour emqx_resource
29
-export([
30
    resource_type/0,
31
    callback_mode/0,
32
    on_start/2,
33
    on_stop/2,
34
    on_query/3,
35
    on_get_status/2,
36
    namespace/0
37
]).
38

39
%% ecpool callback
40
-export([connect/1]).
41

42
-export([roots/0, fields/1, desc/1]).
43

44
-export([mongo_query/5, mongo_insert/3, check_worker_health/1]).
45

46
%% for testing
47
-export([maybe_resolve_srv_and_txt_records/1]).
48

49
-define(HEALTH_CHECK_TIMEOUT, 30000).
50
-define(DEFAULT_MONGO_LIMIT, 1000).
51
-define(DEFAULT_MONGO_BATCH_SIZE, 100).
52

53
%% mongo servers don't need parse
54
-define(MONGO_HOST_OPTIONS, #{
55
    default_port => ?MONGO_DEFAULT_PORT
56
}).
57

58
%%=====================================================================
59

60
namespace() -> "mongo".
11,396✔
61

62
roots() ->
63
    [
28✔
64
        {config, #{
65
            type => hoconsc:union(
66
                [
67
                    hoconsc:ref(?MODULE, single),
68
                    hoconsc:ref(?MODULE, rs),
69
                    hoconsc:ref(?MODULE, sharded)
70
                ]
71
            )
72
        }}
73
    ].
74

75
fields("connector_rs") ->
76
    [
1,742✔
77
        {mongo_type, #{
78
            required => true,
79
            type => rs,
80
            default => rs,
81
            desc => ?DESC("rs_mongo_type")
82
        }},
83
        {servers, servers()},
84
        {w_mode, fun w_mode/1},
85
        {r_mode, fun r_mode/1},
86
        {replica_set_name, fun replica_set_name/1}
87
    ];
88
fields("connector_sharded") ->
89
    [
1,754✔
90
        {mongo_type, #{
91
            required => true,
92
            type => sharded,
93
            default => sharded,
94
            desc => ?DESC("sharded_mongo_type")
95
        }},
96
        {servers, servers()},
97
        {w_mode, fun w_mode/1}
98
    ];
99
fields("connector_single") ->
100
    [
2,103✔
101
        {mongo_type, #{
102
            required => true,
103
            type => single,
104
            default => single,
105
            desc => ?DESC("single_mongo_type")
106
        }},
107
        {server, server()},
108
        {w_mode, fun w_mode/1}
109
    ];
110
fields(Type) when Type =:= rs; Type =:= single; Type =:= sharded ->
111
    fields("connector_" ++ atom_to_list(Type)) ++ fields(mongodb);
3,540✔
112
fields(mongodb) ->
113
    [
114
        {srv_record, fun srv_record/1},
115
        {pool_size, fun emqx_connector_schema_lib:pool_size/1},
116
        {username, fun emqx_connector_schema_lib:username/1},
117
        {password, emqx_connector_schema_lib:password_field()},
118
        {use_legacy_protocol,
119
            hoconsc:mk(hoconsc:enum([auto, true, false]), #{
120
                default => auto,
121
                desc => ?DESC("use_legacy_protocol")
122
            })},
123
        {auth_source, #{
124
            type => binary(),
125
            required => false,
126
            desc => ?DESC("auth_source")
127
        }},
128
        {database, fun emqx_connector_schema_lib:database/1},
129
        {topology, #{type => hoconsc:ref(?MODULE, topology), required => false}}
130
    ] ++
16,299✔
131
        emqx_connector_schema_lib:ssl_fields();
132
fields(topology) ->
133
    [
4,271✔
134
        {pool_size,
135
            hoconsc:mk(
136
                pos_integer(),
137
                #{
138
                    importance => ?IMPORTANCE_HIDDEN,
139
                    %% In most cases we don't need the topology pool as we use ecpool
140
                    default => 1
141
                }
142
            )},
143
        {max_overflow, fun max_overflow/1},
144
        {overflow_ttl, duration("overflow_ttl")},
145
        {overflow_check_period, duration("overflow_check_period")},
146
        {local_threshold_ms, duration("local_threshold")},
147
        {connect_timeout_ms, duration("connect_timeout")},
148
        {socket_timeout_ms, duration("socket_timeout")},
149
        {server_selection_timeout_ms, duration("server_selection_timeout")},
150
        {wait_queue_timeout_ms, duration("wait_queue_timeout")},
151
        {heartbeat_frequency_ms,
152
            hoconsc:mk(
153
                emqx_schema:timeout_duration_ms(),
154
                #{
155
                    default => <<"200s">>,
156
                    desc => ?DESC("heartbeat_period")
157
                }
158
            )},
159
        {min_heartbeat_frequency_ms, duration("min_heartbeat_period")}
160
    ].
161

162
desc("connector_single") ->
163
    ?DESC("desc_single");
130✔
164
desc("connector_rs") ->
165
    ?DESC("desc_rs");
34✔
166
desc("connector_sharded") ->
167
    ?DESC("desc_sharded");
68✔
168
desc(single) ->
169
    ?DESC("desc_single");
14✔
170
desc(rs) ->
171
    ?DESC("desc_rs");
8✔
172
desc(sharded) ->
173
    ?DESC("desc_sharded");
×
174
desc(topology) ->
175
    ?DESC("desc_topology");
455✔
176
desc(_) ->
177
    undefined.
×
178

179
%% ===================================================================
180
resource_type() -> mongodb.
123✔
181

182
callback_mode() -> always_sync.
123✔
183

184
on_start(
185
    InstId,
186
    Config = #{
187
        mongo_type := Type,
188
        pool_size := PoolSize,
189
        ssl := SSL
190
    }
191
) ->
192
    Msg =
124✔
193
        case Type of
194
            single -> "starting_mongodb_single_connector";
84✔
195
            rs -> "starting_mongodb_replica_set_connector";
24✔
196
            sharded -> "starting_mongodb_sharded_connector"
16✔
197
        end,
198
    ?SLOG(info, #{msg => Msg, connector => InstId, config => emqx_utils:redact(Config)}),
124✔
199
    NConfig = #{hosts := Hosts} = maybe_resolve_srv_and_txt_records(Config),
124✔
200
    SslOpts =
124✔
201
        case maps:get(enable, SSL) of
202
            true ->
203
                [
10✔
204
                    {ssl, true},
205
                    {ssl_opts, emqx_tls_lib:to_client_opts(SSL)}
206
                ];
207
            false ->
208
                [{ssl, false}]
114✔
209
        end,
210
    Topology = maps:get(topology, NConfig, #{}),
124✔
211
    Opts = [
124✔
212
        {mongo_type, init_type(NConfig)},
213
        {hosts, Hosts},
214
        {pool_size, PoolSize},
215
        {options, init_topology_options(maps:to_list(Topology), [])},
216
        {worker_options, init_worker_options(maps:to_list(NConfig), SslOpts)}
217
    ],
218
    Collection = maps:get(collection, Config, <<"mqtt">>),
124✔
219
    case emqx_resource_pool:start(InstId, ?MODULE, Opts) of
124✔
220
        ok ->
221
            {ok, #{
107✔
222
                pool_name => InstId,
223
                type => Type,
224
                collection => Collection
225
            }};
226
        {error, Reason} ->
227
            {error, Reason}
17✔
228
    end.
229

230
on_stop(InstId, _State) ->
231
    ?SLOG(info, #{
107✔
232
        msg => "stopping_mongodb_connector",
233
        connector => InstId
234
    }),
107✔
235
    emqx_resource_pool:stop(InstId).
107✔
236

237
on_query(
238
    InstId,
239
    {insert, Document},
240
    #{pool_name := PoolName, collection := Collection} = State
241
) ->
242
    Request = {insert, Collection, Document},
32✔
243
    ?TRACE(
32✔
244
        "QUERY",
32✔
245
        "mongodb_connector_received",
246
        #{request => Request, connector => InstId, state => State}
32✔
247
    ),
248
    case
32✔
249
        ecpool:pick_and_do(
250
            PoolName,
251
            {?MODULE, mongo_insert, [Collection, Document]},
252
            no_handover
253
        )
254
    of
255
        {{false, Reason}, _Document} ->
256
            ?SLOG(error, #{
×
257
                msg => "mongodb_connector_do_query_failed",
258
                request => Request,
259
                reason => Reason,
260
                connector => InstId
261
            }),
×
262
            {error, Reason};
×
263
        {error, ecpool_empty} ->
264
            {error, {recoverable_error, ecpool_empty}};
×
265
        {{true, _Info}, _Document} ->
266
            ok
32✔
267
    end;
268
on_query(InstId, {find_one, Collection, Filter}, State) ->
269
    on_select_query(InstId, {find_one, Collection, Filter, #{}}, State);
24✔
270
on_query(InstId, {find_one, _Collection, _Filter, _Options} = Request, State) ->
271
    on_select_query(InstId, Request, State);
2✔
272
on_query(InstId, {find, Collection, Filter}, State) ->
NEW
273
    on_select_query(InstId, {find, Collection, Filter, #{}}, State);
×
274
on_query(InstId, {find, _Collection, _Filter, _Options} = Request, State) ->
275
    on_select_query(InstId, Request, State).
149✔
276

277
on_select_query(
278
    InstId,
279
    {Action, Collection, Filter, Options},
280
    #{pool_name := PoolName} = State
281
) ->
282
    Request = {Action, Collection, Filter, Options},
175✔
283
    ?TRACE(
175✔
284
        "QUERY",
175✔
285
        "mongodb_connector_received",
286
        #{request => Request, connector => InstId, state => State}
175✔
287
    ),
288
    case
175✔
289
        ecpool:pick_and_do(
290
            PoolName,
291
            {?MODULE, mongo_query, [Action, Collection, Filter, Options]},
292
            no_handover
293
        )
294
    of
295
        {error, Reason} ->
296
            ?SLOG(error, #{
×
297
                msg => "mongodb_connector_do_query_failed",
298
                request => Request,
299
                reason => Reason,
300
                connector => InstId
301
            }),
×
302
            case Reason of
×
303
                ecpool_empty ->
304
                    {error, {recoverable_error, Reason}};
×
305
                _ ->
306
                    {error, Reason}
×
307
            end;
308
        {ok, Cursor} when is_pid(Cursor) ->
309
            Limit = maps:get(limit, Options, ?DEFAULT_MONGO_LIMIT),
140✔
310
            {ok, mc_cursor:take(Cursor, Limit)};
140✔
311
        Result ->
312
            {ok, Result}
32✔
313
    end.
314

315
on_get_status(InstId, #{pool_name := PoolName}) ->
316
    case health_check(PoolName) of
254✔
317
        ok ->
318
            ?tp(debug, emqx_connector_mongo_health_check, #{
245✔
319
                instance_id => InstId,
320
                status => ok
321
            }),
322
            ?status_connected;
245✔
323
        {error, Reason} ->
324
            ?tp(warning, emqx_connector_mongo_health_check, #{
9✔
325
                instance_id => InstId,
326
                reason => Reason,
327
                status => failed
328
            }),
329
            {?status_disconnected, Reason}
9✔
330
    end.
331

332
health_check(PoolName) ->
333
    Results =
254✔
334
        emqx_resource_pool:health_check_workers(
335
            PoolName,
336
            fun ?MODULE:check_worker_health/1,
337
            ?HEALTH_CHECK_TIMEOUT + timer:seconds(1),
338
            #{return_values => true}
339
        ),
340
    case Results of
254✔
341
        {ok, []} ->
342
            {error, worker_processes_dead};
×
343
        {ok, Values} ->
344
            case lists:partition(fun(V) -> V =:= ok end, Values) of
253✔
345
                {_Ok, []} ->
346
                    ok;
245✔
347
                {_Ok, [{error, Reason} | _Errors]} ->
348
                    {error, Reason};
8✔
349
                {_Ok, [Error | _Errors]} ->
350
                    {error, Error}
×
351
            end;
352
        {error, Reason} ->
353
            {error, Reason}
1✔
354
    end.
355

356
%% ===================================================================
357

358
check_worker_health(Conn) ->
359
    %% we don't care if this returns something or not, we just to test the connection
360
    try do_test_query(Conn) of
2,018✔
361
        {error, Reason} ->
362
            ?SLOG(warning, #{
48✔
363
                msg => "mongo_connection_get_status_error",
364
                reason => Reason
365
            }),
×
366
            {error, Reason};
48✔
367
        _ ->
368
            ok
1,954✔
369
    catch
370
        Class:Error ->
371
            ?SLOG(warning, #{
16✔
372
                msg => "mongo_connection_get_status_exception",
373
                class => Class,
374
                error => Error
375
            }),
×
376
            {error, {Class, Error}}
16✔
377
    end.
378

379
do_test_query(Conn) ->
380
    mongoc:transaction_query(
2,018✔
381
        Conn,
382
        fun(Conf = #{pool := Worker}) ->
383
            Query = mongoc:find_one_query(Conf, <<"foo">>, #{}, #{}, 0),
1,970✔
384
            mc_worker_api:find_one(Worker, Query)
1,970✔
385
        end,
386
        #{},
387
        ?HEALTH_CHECK_TIMEOUT
388
    ).
389

390
connect(Opts) ->
391
    Type = proplists:get_value(mongo_type, Opts, single),
968✔
392
    Hosts = proplists:get_value(hosts, Opts, []),
968✔
393
    Options = proplists:get_value(options, Opts, []),
968✔
394
    WorkerOptions = proplists:get_value(worker_options, Opts, []),
968✔
395
    ?SLOG(warning, #{msg => "connecting_mongodb", args => [Type, Hosts, Options, WorkerOptions]}),
968✔
396
    mongo_api:connect(Type, Hosts, Options, WorkerOptions).
968✔
397

398
mongo_query(Conn, find, Collection, Filter, Options) ->
399
    Projector = maps:get(projector, Options, #{}),
149✔
400
    Skip = maps:get(skip, Options, 0),
149✔
401
    BatchSize = maps:get(batch_size, Options, ?DEFAULT_MONGO_BATCH_SIZE),
149✔
402
    mongo_api:find(Conn, Collection, Filter, Projector, Skip, BatchSize);
149✔
403
mongo_query(Conn, find_one, Collection, Filter, Options) ->
404
    Projector = maps:get(projector, Options, #{}),
26✔
405
    Skip = maps:get(skip, Options, 0),
26✔
406
    mongo_api:find_one(Conn, Collection, Filter, Projector, Skip);
26✔
407
mongo_query(_Conn, _Action, _Collection, _Filter, _Options) ->
408
    ok.
×
409

410
mongo_insert(Conn, Collection, Documents) ->
411
    mongo_api:insert(Conn, Collection, Documents).
32✔
412

413
init_type(#{mongo_type := rs, replica_set_name := ReplicaSetName}) ->
414
    {rs, ReplicaSetName};
24✔
415
init_type(#{mongo_type := Type}) ->
416
    Type.
100✔
417

418
init_topology_options([{pool_size, Val} | R], Acc) ->
419
    init_topology_options(R, [{pool_size, Val} | Acc]);
124✔
420
init_topology_options([{max_overflow, Val} | R], Acc) ->
421
    init_topology_options(R, [{max_overflow, Val} | Acc]);
124✔
422
init_topology_options([{overflow_ttl, Val} | R], Acc) ->
423
    init_topology_options(R, [{overflow_ttl, Val} | Acc]);
×
424
init_topology_options([{overflow_check_period, Val} | R], Acc) ->
425
    init_topology_options(R, [{overflow_check_period, Val} | Acc]);
×
426
init_topology_options([{local_threshold_ms, Val} | R], Acc) ->
427
    init_topology_options(R, [{'localThresholdMS', Val} | Acc]);
×
428
init_topology_options([{connect_timeout_ms, Val} | R], Acc) ->
429
    init_topology_options(R, [{'connectTimeoutMS', Val} | Acc]);
13✔
430
init_topology_options([{socket_timeout_ms, Val} | R], Acc) ->
431
    init_topology_options(R, [{'socketTimeoutMS', Val} | Acc]);
×
432
init_topology_options([{server_selection_timeout_ms, Val} | R], Acc) ->
433
    init_topology_options(R, [{'serverSelectionTimeoutMS', Val} | Acc]);
10✔
434
init_topology_options([{wait_queue_timeout_ms, Val} | R], Acc) ->
435
    init_topology_options(R, [{'waitQueueTimeoutMS', Val} | Acc]);
×
436
init_topology_options([{heartbeat_frequency_ms, Val} | R], Acc) ->
437
    init_topology_options(R, [{'heartbeatFrequencyMS', Val} | Acc]);
124✔
438
init_topology_options([{min_heartbeat_frequency_ms, Val} | R], Acc) ->
439
    init_topology_options(R, [{'minHeartbeatFrequencyMS', Val} | Acc]);
×
440
init_topology_options([_ | R], Acc) ->
441
    init_topology_options(R, Acc);
×
442
init_topology_options([], Acc) ->
443
    Acc.
124✔
444

445
init_worker_options([{database, V} | R], Acc) ->
446
    init_worker_options(R, [{database, V} | Acc]);
124✔
447
init_worker_options([{auth_source, V} | R], Acc) ->
448
    init_worker_options(R, [{auth_source, V} | Acc]);
74✔
449
init_worker_options([{username, V} | R], Acc) ->
450
    init_worker_options(R, [{login, V} | Acc]);
74✔
451
init_worker_options([{password, Secret} | R], Acc) ->
452
    init_worker_options(R, [{password, Secret} | Acc]);
74✔
453
init_worker_options([{w_mode, V} | R], Acc) ->
454
    init_worker_options(R, [{w_mode, V} | Acc]);
124✔
455
init_worker_options([{r_mode, V} | R], Acc) ->
456
    init_worker_options(R, [{r_mode, V} | Acc]);
24✔
457
init_worker_options([{use_legacy_protocol, V} | R], Acc) ->
458
    init_worker_options(R, [{use_legacy_protocol, V} | Acc]);
124✔
459
init_worker_options([_ | R], Acc) ->
460
    init_worker_options(R, Acc);
1,572✔
461
init_worker_options([], Acc) ->
462
    Acc.
124✔
463

464
%% ===================================================================
465
%% Schema funcs
466

467
server() ->
468
    Meta = #{desc => ?DESC("server")},
2,103✔
469
    emqx_schema:servers_sc(Meta, ?MONGO_HOST_OPTIONS).
2,103✔
470

471
servers() ->
472
    Meta = #{desc => ?DESC("servers")},
3,496✔
473
    emqx_schema:servers_sc(Meta, ?MONGO_HOST_OPTIONS).
3,496✔
474

475
w_mode(type) -> hoconsc:enum([unsafe, safe]);
5,489✔
476
w_mode(desc) -> ?DESC("w_mode");
5,034✔
477
w_mode(default) -> unsafe;
5,489✔
478
w_mode(_) -> undefined.
74,031✔
479

480
r_mode(type) -> hoconsc:enum([master, slave_ok]);
1,742✔
481
r_mode(desc) -> ?DESC("r_mode");
1,678✔
482
r_mode(default) -> master;
1,742✔
483
r_mode(_) -> undefined.
23,998✔
484

485
duration(Desc) ->
486
    #{
34,168✔
487
        type => emqx_schema:timeout_duration_ms(),
488
        required => false,
489
        desc => ?DESC(Desc)
490
    }.
491

492
max_overflow(type) -> non_neg_integer();
4,271✔
493
max_overflow(desc) -> ?DESC("max_overflow");
3,816✔
494
max_overflow(default) -> 0;
4,271✔
495
max_overflow(_) -> undefined.
56,944✔
496

497
replica_set_name(type) -> binary();
1,742✔
498
replica_set_name(desc) -> ?DESC("replica_set_name");
1,678✔
499
replica_set_name(required) -> true;
1,804✔
500
replica_set_name(_) -> undefined.
23,930✔
501

502
srv_record(type) -> boolean();
4,271✔
503
srv_record(desc) -> ?DESC("srv_record");
3,816✔
504
srv_record(default) -> false;
4,271✔
505
srv_record(_) -> undefined.
56,942✔
506

507
%% ===================================================================
508
%% Internal funcs
509

510
maybe_resolve_srv_and_txt_records(#{server := Server} = Config) ->
511
    NConfig = maps:remove(server, Config),
88✔
512
    maybe_resolve_srv_and_txt_records1(Server, NConfig);
88✔
513
maybe_resolve_srv_and_txt_records(#{servers := Servers} = Config) ->
514
    NConfig = maps:remove(servers, Config),
48✔
515
    maybe_resolve_srv_and_txt_records1(Servers, NConfig).
48✔
516

517
maybe_resolve_srv_and_txt_records1(
518
    Servers0,
519
    #{
520
        mongo_type := Type,
521
        srv_record := false
522
    } = Config
523
) ->
524
    case Type =:= rs andalso maps:is_key(replica_set_name, Config) =:= false of
124✔
525
        true ->
526
            throw(#{
×
527
                reason => "missing_parameter",
528
                param => replica_set_name
529
            });
530
        false ->
531
            Servers = parse_servers(Servers0),
124✔
532
            Config#{hosts => format_hosts(Servers)}
124✔
533
    end;
534
maybe_resolve_srv_and_txt_records1(
535
    Servers,
536
    #{
537
        mongo_type := Type,
538
        srv_record := true
539
    } = Config
540
) ->
541
    %% when srv is in use, it's typically only one DNS resolution needed,
542
    %% however, by the schema definition, it's allowed to configure more than one.
543
    %% here we keep only the fist
544
    [{DNS, _IgnorePort} | _] = parse_servers(Servers),
12✔
545
    DnsRecords = resolve_srv_records(DNS),
12✔
546
    Hosts = format_hosts(DnsRecords),
10✔
547
    ?tp(info, resolved_srv_records, #{dns => DNS, resolved_hosts => Hosts}),
10✔
548
    ExtraOpts = resolve_txt_records(Type, DNS),
10✔
549
    ?tp(info, resolved_txt_records, #{dns => DNS, resolved_options => ExtraOpts}),
6✔
550
    maps:merge(Config#{hosts => Hosts}, ExtraOpts).
6✔
551

552
resolve_srv_records(DNS0) ->
553
    DNS = "_mongodb._tcp." ++ DNS0,
12✔
554
    DnsData = emqx_connector_lib:resolve_dns(DNS, srv),
12✔
555
    case [{Host, Port} || {_, _, Port, Host} <- DnsData] of
12✔
556
        [] ->
557
            throw(#{
2✔
558
                reason => "failed_to_resolve_srv_record",
559
                dns => DNS
560
            });
561
        L ->
562
            L
10✔
563
    end.
564

565
resolve_txt_records(Type, DNS) ->
566
    case emqx_connector_lib:resolve_dns(DNS, txt) of
10✔
567
        [] ->
568
            #{};
2✔
569
        [[QueryString]] = L ->
570
            %% e.g. "authSource=admin&replicaSet=atlas-wrnled-shard-0"
571
            case uri_string:dissect_query(QueryString) of
6✔
572
                {error, _, _} ->
573
                    throw(#{
2✔
574
                        reason => "bad_txt_record_resolution",
575
                        resolved => L
576
                    });
577
                Options ->
578
                    convert_options(Type, normalize_options(Options))
4✔
579
            end;
580
        L ->
581
            throw(#{
2✔
582
                reason => "multiple_txt_records",
583
                resolved => L
584
            })
585
    end.
586

587
normalize_options([]) ->
588
    [];
4✔
589
normalize_options([{Name, Value} | Options]) ->
590
    [{string:lowercase(Name), Value} | normalize_options(Options)].
8✔
591

592
convert_options(rs, Options) ->
593
    M1 = maybe_add_option(auth_source, "authSource", Options),
2✔
594
    M2 = maybe_add_option(replica_set_name, "replicaSet", Options),
2✔
595
    maps:merge(M1, M2);
2✔
596
convert_options(_, Options) ->
597
    maybe_add_option(auth_source, "authSource", Options).
2✔
598

599
maybe_add_option(ConfigKey, OptName0, Options) ->
600
    OptName = string:lowercase(OptName0),
6✔
601
    case lists:keyfind(OptName, 1, Options) of
6✔
602
        {_, OptValue} ->
603
            #{ConfigKey => iolist_to_binary(OptValue)};
6✔
604
        false ->
605
            #{}
×
606
    end.
607

608
format_host({Host, Port}) ->
609
    iolist_to_binary([Host, ":", integer_to_list(Port)]).
162✔
610

611
format_hosts(Hosts) ->
612
    lists:map(fun format_host/1, Hosts).
134✔
613

614
parse_servers(HoconValue) ->
615
    lists:map(
136✔
616
        fun(#{hostname := Host, port := Port}) ->
617
            {Host, Port}
148✔
618
        end,
619
        emqx_schema:parse_servers(HoconValue, ?MONGO_HOST_OPTIONS)
620
    ).
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc