• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

emqx / emqx / 12235783303

09 Dec 2024 12:36PM CUT coverage: 82.037%. First build
12235783303

Pull #14362

github

web-flow
Merge 4819ded51 into 83154d24b
Pull Request #14362: refactor(resource): forbid changing resource state from `on_get_status` return

62 of 82 new or added lines in 27 files covered. (75.61%)

56457 of 68819 relevant lines covered (82.04%)

15149.4 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.35
/apps/emqx_postgresql/src/emqx_postgresql.erl
1
%%--------------------------------------------------------------------
2
%% Copyright (c) 2020-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
3
%%
4
%% Licensed under the Apache License, Version 2.0 (the "License");
5
%% you may not use this file except in compliance with the License.
6
%% You may obtain a copy of the License at
7
%%
8
%%     http://www.apache.org/licenses/LICENSE-2.0
9
%%
10
%% Unless required by applicable law or agreed to in writing, software
11
%% distributed under the License is distributed on an "AS IS" BASIS,
12
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
%% See the License for the specific language governing permissions and
14
%% limitations under the License.
15
%%--------------------------------------------------------------------
16
-module(emqx_postgresql).
17

18
-include("emqx_postgresql.hrl").
19
-include_lib("emqx_resource/include/emqx_resource.hrl").
20
-include_lib("emqx_connector/include/emqx_connector.hrl").
21
-include_lib("typerefl/include/types.hrl").
22
-include_lib("emqx/include/logger.hrl").
23
-include_lib("hocon/include/hoconsc.hrl").
24
-include_lib("epgsql/include/epgsql.hrl").
25
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
26

27
-export([roots/0, fields/1, namespace/0]).
28

29
-behaviour(emqx_resource).
30

31
%% callbacks of behaviour emqx_resource
32
-export([
33
    resource_type/0,
34
    callback_mode/0,
35
    on_start/2,
36
    on_stop/2,
37
    on_query/3,
38
    on_batch_query/3,
39
    on_get_status/2,
40
    on_add_channel/4,
41
    on_remove_channel/3,
42
    on_get_channels/1,
43
    on_get_channel_status/3,
44
    on_format_query_result/1
45
]).
46

47
-export([connect/1]).
48

49
-export([
50
    query/3,
51
    prepared_query/3,
52
    execute_batch/3
53
]).
54

55
-export([disable_prepared_statements/0]).
56

57
%% for ecpool workers usage
58
-export([do_get_status/1, prepare_sql_to_conn/2, get_reconnect_callback_signature/1]).
59

60
-define(PGSQL_HOST_OPTIONS, #{
61
    default_port => ?PGSQL_DEFAULT_PORT
62
}).
63

64
-type template() :: {unicode:chardata(), emqx_template_sql:row_template()}.
65
-type state() ::
66
    #{
67
        pool_name := binary(),
68
        query_templates := #{binary() => template()},
69
        prepares := disabled | #{binary() => epgsql:statement()} | {error, _}
70
    }.
71

72
%% FIXME: add `{error, sync_required}' to `epgsql:execute_batch'
73
%% We want to be able to call sync if any message from the backend leaves the driver in an
74
%% inconsistent state needing sync.
75
-dialyzer({nowarn_function, [execute_batch/3]}).
76

77
%%=====================================================================
78

79
namespace() -> postgres.
×
80

81
roots() ->
82
    [{config, #{type => hoconsc:ref(?MODULE, config)}}].
2✔
83

84
fields(config) ->
85
    [
86
        {server, server()},
87
        disable_prepared_statements()
88
    ] ++
2,591✔
89
        adjust_fields(emqx_connector_schema_lib:relational_db_fields()) ++
90
        emqx_connector_schema_lib:ssl_fields() ++
91
        emqx_connector_schema_lib:prepare_statement_fields().
92

93
server() ->
94
    Meta = #{desc => ?DESC("server")},
2,591✔
95
    emqx_schema:servers_sc(Meta, ?PGSQL_HOST_OPTIONS).
2,591✔
96

97
disable_prepared_statements() ->
98
    {disable_prepared_statements,
40,910✔
99
        hoconsc:mk(
100
            boolean(),
101
            #{
102
                default => false,
103
                required => false,
104
                desc => ?DESC("disable_prepared_statements")
105
            }
106
        )}.
107

108
adjust_fields(Fields) ->
109
    lists:map(
2,591✔
110
        fun
111
            ({username, Sc}) ->
112
                %% to please dialyzer...
113
                Override = #{type => hocon_schema:field_schema(Sc, type), required => true},
2,591✔
114
                {username, hocon_schema:override(Sc, Override)};
2,591✔
115
            (Field) ->
116
                Field
10,364✔
117
        end,
118
        Fields
119
    ).
120

121
%% ===================================================================
122
resource_type() -> pgsql.
238✔
123

124
callback_mode() -> always_sync.
238✔
125

126
-spec on_start(binary(), hocon:config()) -> {ok, state()} | {error, _}.
127
on_start(
128
    InstId,
129
    #{
130
        server := Server,
131
        disable_prepared_statements := DisablePreparedStatements,
132
        database := DB,
133
        username := User,
134
        pool_size := PoolSize,
135
        ssl := SSL
136
    } = Config
137
) ->
138
    #{hostname := Host, port := Port} = emqx_schema:parse_server(Server, ?PGSQL_HOST_OPTIONS),
237✔
139
    ?SLOG(info, #{
237✔
140
        msg => "starting_postgresql_connector",
141
        connector => InstId,
142
        config => emqx_utils:redact(Config)
143
    }),
237✔
144
    SslOpts =
237✔
145
        case maps:get(enable, SSL) of
146
            true ->
147
                [
79✔
148
                    %% note: this is converted to `required' in
149
                    %% `conn_opts/2', and there's a boolean guard
150
                    %% there; if this is set to `required' here,
151
                    %% that'll require changing `conn_opts/2''s guard.
152
                    {ssl, true},
153
                    {ssl_opts, emqx_tls_lib:to_client_opts(SSL)}
154
                ];
155
            false ->
156
                [{ssl, false}]
158✔
157
        end,
158
    Options = [
237✔
159
        {host, Host},
160
        {port, Port},
161
        {username, User},
162
        {password, maps:get(password, Config, emqx_secret:wrap(""))},
163
        {database, DB},
164
        {auto_reconnect, ?AUTO_RECONNECT_INTERVAL},
165
        {pool_size, PoolSize}
166
    ],
167
    State1 = parse_sql_template(Config, <<"send_message">>),
237✔
168
    State2 = State1#{installed_channels => #{}},
237✔
169
    case emqx_resource_pool:start(InstId, ?MODULE, Options ++ SslOpts) of
237✔
170
        ok ->
171
            Prepares =
204✔
172
                case DisablePreparedStatements of
173
                    true -> disabled;
28✔
174
                    false -> #{}
176✔
175
                end,
176
            case init_prepare(State2#{pool_name => InstId, prepares => Prepares}) of
204✔
177
                #{prepares := {error, _} = Error} ->
178
                    Error;
2✔
179
                State ->
180
                    {ok, State}
202✔
181
            end;
182
        {error, Reason} ->
183
            ?tp(
33✔
184
                pgsql_connector_start_failed,
185
                #{error => Reason}
186
            ),
187
            {error, Reason}
33✔
188
    end.
189

190
on_stop(InstId, State) ->
191
    ?SLOG(info, #{
203✔
192
        msg => "stopping_postgresql_connector",
193
        connector => InstId
194
    }),
203✔
195
    close_connections(State),
203✔
196
    Res = emqx_resource_pool:stop(InstId),
203✔
197
    ?tp(postgres_stopped, #{instance_id => InstId}),
203✔
198
    Res.
203✔
199

200
close_connections(#{pool_name := PoolName} = _State) ->
201
    WorkerPids = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)],
203✔
202
    close_connections_with_worker_pids(WorkerPids),
203✔
203
    ok.
203✔
204

205
close_connections_with_worker_pids([WorkerPid | Rest]) ->
206
    %% We ignore errors since any error probably means that the
207
    %% connection is closed already.
208
    try ecpool_worker:client(WorkerPid) of
1,610✔
209
        {ok, Conn} ->
210
            _ = epgsql:close(Conn),
1,460✔
211
            close_connections_with_worker_pids(Rest);
1,460✔
212
        _ ->
213
            close_connections_with_worker_pids(Rest)
150✔
214
    catch
215
        _:_ ->
216
            close_connections_with_worker_pids(Rest)
×
217
    end;
218
close_connections_with_worker_pids([]) ->
219
    ok.
203✔
220

221
on_add_channel(
222
    _InstId,
223
    #{
224
        installed_channels := InstalledChannels
225
    } = OldState,
226
    ChannelId,
227
    ChannelConfig
228
) ->
229
    %% The following will throw an exception if the bridge producers fails to start
230
    {ok, ChannelState} = create_channel_state(ChannelId, OldState, ChannelConfig),
175✔
231
    case ChannelState of
175✔
232
        #{prepares := {error, Reason}} ->
233
            {error, {unhealthy_target, Reason}};
33✔
234
        _ ->
235
            NewInstalledChannels = maps:put(ChannelId, ChannelState, InstalledChannels),
142✔
236
            %% Update state
237
            NewState = OldState#{installed_channels => NewInstalledChannels},
142✔
238
            {ok, NewState}
142✔
239
    end.
240

241
create_channel_state(
242
    ChannelId,
243
    #{
244
        pool_name := PoolName,
245
        prepares := Prepares
246
    } = _ConnectorState,
247
    #{parameters := Parameters} = _ChannelConfig
248
) ->
249
    State1 = parse_sql_template(Parameters, ChannelId),
175✔
250
    {ok,
175✔
251
        init_prepare(State1#{
252
            pool_name => PoolName,
253
            prepares => Prepares,
254
            prepare_statement => #{}
255
        })}.
256

257
on_remove_channel(
258
    _InstId,
259
    #{
260
        installed_channels := InstalledChannels
261
    } = OldState,
262
    ChannelId
263
) ->
264
    %% Close prepared statements
265
    ok = close_prepared_statement(ChannelId, OldState),
140✔
266
    NewInstalledChannels = maps:remove(ChannelId, InstalledChannels),
140✔
267
    %% Update state
268
    NewState = OldState#{installed_channels => NewInstalledChannels},
140✔
269
    {ok, NewState}.
140✔
270

271
close_prepared_statement(_ChannelId, #{prepares := disabled}) ->
272
    ok;
24✔
273
close_prepared_statement(ChannelId, #{pool_name := PoolName} = State) ->
274
    WorkerPids = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)],
116✔
275
    close_prepared_statement(WorkerPids, ChannelId, State),
116✔
276
    ok.
116✔
277

278
close_prepared_statement([WorkerPid | Rest], ChannelId, State) ->
279
    %% We ignore errors since any error probably means that the
280
    %% prepared statement doesn't exist. If it exists when we try
281
    %% to insert one with the same name, we will try to remove it
282
    %% again anyway.
283
    try ecpool_worker:client(WorkerPid) of
928✔
284
        {ok, Conn} ->
285
            ok = ecpool_worker:remove_reconnect_callback_by_signature(WorkerPid, ChannelId),
784✔
286
            case get_templated_statement(ChannelId, State) of
784✔
287
                {ok, Statement} ->
288
                    _ = epgsql:close(Conn, Statement),
784✔
289
                    close_prepared_statement(Rest, ChannelId, State);
784✔
290
                error ->
291
                    %% channel was not added
292
                    ok
×
293
            end;
294
        _ ->
295
            close_prepared_statement(Rest, ChannelId, State)
144✔
296
    catch
297
        _:_ ->
298
            close_prepared_statement(Rest, ChannelId, State)
×
299
    end;
300
close_prepared_statement([], _ChannelId, _State) ->
301
    ok.
116✔
302

303
on_get_channel_status(
304
    _ResId,
305
    ChannelId,
306
    #{
307
        pool_name := PoolName,
308
        installed_channels := Channels
309
    } = _State
310
) ->
311
    ChannelState = maps:get(ChannelId, Channels),
213✔
312
    case
213✔
313
        do_check_channel_sql(
314
            PoolName,
315
            ChannelId,
316
            ChannelState
317
        )
318
    of
319
        ok ->
320
            ?status_connected;
203✔
321
        {error, undefined_table} ->
322
            {?status_disconnected, {unhealthy_target, <<"Table does not exist">>}}
8✔
323
    end.
324

325
do_check_channel_sql(
326
    PoolName,
327
    ChannelId,
328
    #{query_templates := ChannelQueryTemplates} = _ChannelState
329
) ->
330
    {SQL, _RowTemplate} = maps:get(ChannelId, ChannelQueryTemplates),
213✔
331
    WorkerPids = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)],
213✔
332
    validate_table_existence(WorkerPids, SQL).
213✔
333

334
on_get_channels(ResId) ->
335
    emqx_bridge_v2:get_channels_for_connector(ResId).
893✔
336

337
-spec on_query
338
    %% Called from authn and authz modules
339
    (connector_resource_id(), {prepared_query, binary(), [term()]}, state()) ->
340
        {ok, _} | {error, term()};
341
    %% Called from bridges
342
    (connector_resource_id(), {action_resource_id(), map()}, state()) ->
343
        {ok, _} | {error, term()}.
344
on_query(InstId, {TypeOrKey, NameOrMap}, State) ->
345
    on_query(InstId, {TypeOrKey, NameOrMap, []}, State);
97✔
346
on_query(
347
    InstId,
348
    {TypeOrKey, NameOrMap, Params},
349
    #{pool_name := PoolName} = State
350
) ->
351
    ?TRACE("QUERY", "postgresql_connector_received_sql_query", #{
203✔
352
        connector => InstId,
353
        type => TypeOrKey,
354
        sql => NameOrMap,
355
        state => State
356
    }),
203✔
357
    {QueryType, NameOrSQL2, Data} = proc_sql_params(TypeOrKey, NameOrMap, Params, State),
203✔
358
    emqx_trace:rendered_action_template(
203✔
359
        TypeOrKey,
360
        #{
361
            statement_type => QueryType,
362
            statement_or_name => NameOrSQL2,
363
            data => Data
364
        }
365
    ),
366
    Res = on_sql_query(InstId, PoolName, QueryType, NameOrSQL2, Data),
203✔
367
    ?tp(postgres_bridge_connector_on_query_return, #{instance_id => InstId, result => Res}),
200✔
368
    handle_result(Res).
200✔
369

370
on_batch_query(
371
    InstId,
372
    [{Key, _} = Request | _] = BatchReq,
373
    #{pool_name := PoolName} = State
374
) ->
375
    BinKey = to_bin(Key),
46✔
376
    case get_template(BinKey, State) of
46✔
377
        undefined ->
378
            Log = #{
4✔
379
                connector => InstId,
380
                first_request => Request,
381
                state => State,
382
                msg => "batch prepare not implemented"
383
            },
384
            ?SLOG(error, Log),
4✔
385
            {error, {unrecoverable_error, batch_prepare_not_implemented}};
4✔
386
        {_Statement, RowTemplate} ->
387
            {ok, StatementTemplate} = get_templated_statement(BinKey, State),
42✔
388
            Rows = [render_prepare_sql_row(RowTemplate, Data) || {_Key, Data} <- BatchReq],
42✔
389
            emqx_trace:rendered_action_template(
42✔
390
                Key,
391
                #{
392
                    statement_type => execute_batch,
393
                    statement_or_name => StatementTemplate,
394
                    data => Rows
395
                }
396
            ),
397
            case on_sql_query(InstId, PoolName, execute_batch, StatementTemplate, Rows) of
42✔
398
                {error, _Error} = Result ->
399
                    handle_result(Result);
3✔
400
                {_Column, Results} ->
401
                    handle_batch_result(Results, 0)
38✔
402
            end
403
    end;
404
on_batch_query(InstId, BatchReq, State) ->
405
    ?SLOG(error, #{
4✔
406
        connector => InstId,
407
        request => BatchReq,
408
        state => State,
409
        msg => "invalid request"
410
    }),
×
411
    {error, {unrecoverable_error, invalid_request}}.
4✔
412

413
proc_sql_params(ActionResId, #{} = Map, [], State) when is_binary(ActionResId) ->
414
    %% When this connector is called from actions/bridges.
415
    DisablePreparedStatements = prepared_statements_disabled(State),
31✔
416
    {ExprTemplate, RowTemplate} = get_template(ActionResId, State),
31✔
417
    Rendered = render_prepare_sql_row(RowTemplate, Map),
31✔
418
    case DisablePreparedStatements of
31✔
419
        true ->
420
            {query, ExprTemplate, Rendered};
6✔
421
        false ->
422
            {prepared_query, ActionResId, Rendered}
25✔
423
    end;
424
proc_sql_params(prepared_query, ConnResId, Params, State) ->
425
    %% When this connector is called from authn/authz modules
426
    DisablePreparedStatements = prepared_statements_disabled(State),
79✔
427
    case DisablePreparedStatements of
79✔
428
        true ->
429
            #{query_templates := #{ConnResId := {ExprTemplate, _VarsTemplate}}} = State,
9✔
430
            {query, ExprTemplate, Params};
9✔
431
        false ->
432
            %% Connector resource id itself is the prepared statement name
433
            {prepared_query, ConnResId, Params}
70✔
434
    end;
435
proc_sql_params(QueryType, SQL, Params, _State) when
436
    is_atom(QueryType) andalso
437
        (is_binary(SQL) orelse is_list(SQL)) andalso
438
        is_list(Params)
439
->
440
    %% When called to do ad-hoc commands/queries.
441
    {QueryType, SQL, Params}.
93✔
442

443
prepared_statements_disabled(State) ->
444
    maps:get(prepares, State, #{}) =:= disabled.
110✔
445

446
get_template(Key, #{installed_channels := Channels} = _State) when is_map_key(Key, Channels) ->
447
    BinKey = to_bin(Key),
73✔
448
    ChannelState = maps:get(BinKey, Channels),
73✔
449
    ChannelQueryTemplates = maps:get(query_templates, ChannelState),
73✔
450
    maps:get(BinKey, ChannelQueryTemplates);
73✔
451
get_template(Key, #{query_templates := Templates}) ->
452
    BinKey = to_bin(Key),
4✔
453
    maps:get(BinKey, Templates, undefined).
4✔
454

455
get_templated_statement(Key, #{installed_channels := Channels} = _State) ->
456
    BinKey = to_bin(Key),
826✔
457
    case is_map_key(BinKey, Channels) of
826✔
458
        true ->
459
            ChannelState = maps:get(BinKey, Channels),
826✔
460
            case ChannelState of
826✔
461
                #{prepares := disabled, query_templates := #{BinKey := {ExprTemplate, _}}} ->
462
                    {ok, ExprTemplate};
6✔
463
                #{prepares := #{BinKey := ExprTemplate}} ->
464
                    {ok, ExprTemplate}
820✔
465
            end;
466
        false ->
467
            error
×
468
    end;
469
get_templated_statement(Key, #{prepares := PrepStatements}) ->
470
    BinKey = to_bin(Key),
×
471
    {ok, maps:get(BinKey, PrepStatements)}.
×
472

473
on_sql_query(InstId, PoolName, Type, NameOrSQL, Data) ->
474
    try ecpool:pick_and_do(PoolName, {?MODULE, Type, [NameOrSQL, Data]}, no_handover) of
245✔
475
        {error, Reason} ->
476
            ?tp(
14✔
477
                pgsql_connector_query_return,
478
                #{error => Reason}
479
            ),
480
            TranslatedError = translate_to_log_context(Reason),
14✔
481
            ?SLOG(
14✔
482
                error,
14✔
483
                maps:merge(
484
                    #{
485
                        msg => "postgresql_connector_do_sql_query_failed",
486
                        connector => InstId,
487
                        type => Type,
488
                        sql => NameOrSQL
489
                    },
490
                    TranslatedError
491
                )
×
492
            ),
493
            case Reason of
14✔
494
                sync_required ->
495
                    {error, {recoverable_error, Reason}};
×
496
                ecpool_empty ->
497
                    {error, {recoverable_error, Reason}};
×
498
                {error, error, _, undefined_table, _, _} ->
499
                    {error, {unrecoverable_error, export_error(TranslatedError)}};
×
500
                _ ->
501
                    {error, export_error(TranslatedError)}
14✔
502
            end;
503
        Result ->
504
            ?tp(
223✔
505
                pgsql_connector_query_return,
506
                #{result => Result}
507
            ),
508
            Result
223✔
509
    catch
510
        error:function_clause:Stacktrace ->
511
            ?SLOG(error, #{
4✔
512
                msg => "postgresql_connector_do_sql_query_failed",
513
                connector => InstId,
514
                type => Type,
515
                sql => NameOrSQL,
516
                reason => function_clause,
517
                stacktrace => Stacktrace
518
            }),
×
519
            {error, {unrecoverable_error, invalid_request}}
4✔
520
    end.
521

522
on_get_status(_InstId, #{pool_name := PoolName} = State) ->
523
    case emqx_resource_pool:health_check_workers(PoolName, fun ?MODULE:do_get_status/1) of
471✔
524
        true ->
525
            case do_check_prepares(State) of
453✔
526
                ok ->
527
                    ?status_connected;
453✔
528
                {error, undefined_table} ->
529
                    %% return error indicating that we are connected but the target table
530
                    %% is not created
NEW
531
                    {?status_disconnected, unhealthy_target}
×
532
            end;
533
        false ->
534
            ?status_connecting
16✔
535
    end.
536

537
do_get_status(Conn) ->
538
    ok == element(1, epgsql:squery(Conn, "SELECT count(1) AS T")).
3,650✔
539

540
do_check_prepares(
541
    #{
542
        pool_name := PoolName,
543
        query_templates := #{<<"send_message">> := {SQL, _RowTemplate}}
544
    }
545
) ->
546
    WorkerPids = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)],
×
547
    case validate_table_existence(WorkerPids, SQL) of
×
548
        ok ->
549
            ok;
×
550
        {error, Reason} ->
551
            {error, Reason}
×
552
    end;
553
do_check_prepares(_) ->
554
    ok.
453✔
555

556
-spec validate_table_existence([pid()], binary()) -> ok | {error, undefined_table}.
557
validate_table_existence([WorkerPid | Rest], SQL) ->
558
    try ecpool_worker:client(WorkerPid) of
213✔
559
        {ok, Conn} ->
560
            case epgsql:parse2(Conn, "", SQL, []) of
213✔
561
                {error, {_, _, _, undefined_table, _, _}} ->
562
                    {error, undefined_table};
8✔
563
                Res when is_tuple(Res) andalso ok == element(1, Res) ->
564
                    ok;
203✔
565
                Res ->
566
                    ?tp(postgres_connector_bad_parse2, #{result => Res}),
×
567
                    validate_table_existence(Rest, SQL)
×
568
            end;
569
        _ ->
570
            validate_table_existence(Rest, SQL)
×
571
    catch
572
        exit:{noproc, _} ->
573
            validate_table_existence(Rest, SQL)
×
574
    end;
575
validate_table_existence([], _SQL) ->
576
    %% All workers either replied an unexpected error; we will retry
577
    %% on the next health check.
578
    ok.
×
579

580
%% ===================================================================
581

582
connect(Opts) ->
583
    Host = proplists:get_value(host, Opts),
1,899✔
584
    Username = proplists:get_value(username, Opts),
1,899✔
585
    %% TODO: teach `epgsql` to accept 0-arity closures as passwords.
586
    Password = emqx_secret:unwrap(proplists:get_value(password, Opts)),
1,899✔
587
    case epgsql:connect(Host, Username, Password, conn_opts(Opts)) of
1,899✔
588
        {ok, _Conn} = Ok ->
589
            Ok;
1,636✔
590
        {error, Reason} ->
591
            {error, Reason}
263✔
592
    end.
593

594
query(Conn, SQL, Params) ->
595
    case epgsql:equery(Conn, SQL, Params) of
108✔
596
        {error, sync_required} = Res ->
597
            ok = epgsql:sync(Conn),
×
598
            Res;
×
599
        Res ->
600
            Res
102✔
601
    end.
602

603
prepared_query(Conn, Name, Params) ->
604
    case epgsql:prepared_query2(Conn, Name, Params) of
92✔
605
        {error, sync_required} = Res ->
606
            ok = epgsql:sync(Conn),
×
607
            Res;
×
608
        Res ->
609
            Res
91✔
610
    end.
611

612
execute_batch(Conn, Statement, Params) ->
613
    case epgsql:execute_batch(Conn, Statement, Params) of
39✔
614
        {error, sync_required} = Res ->
615
            ok = epgsql:sync(Conn),
×
616
            Res;
×
617
        Res ->
618
            Res
38✔
619
    end.
620

621
conn_opts(Opts) ->
622
    conn_opts(Opts, []).
1,899✔
623
conn_opts([], Acc) ->
624
    Acc;
1,899✔
625
conn_opts([Opt = {database, _} | Opts], Acc) ->
626
    conn_opts(Opts, [Opt | Acc]);
1,899✔
627
conn_opts([{ssl, Bool} | Opts], Acc) when is_boolean(Bool) ->
628
    Flag =
1,899✔
629
        case Bool of
630
            true -> required;
640✔
631
            false -> false
1,259✔
632
        end,
633
    conn_opts(Opts, [{ssl, Flag} | Acc]);
1,899✔
634
conn_opts([Opt = {port, _} | Opts], Acc) ->
635
    conn_opts(Opts, [Opt | Acc]);
1,899✔
636
conn_opts([Opt = {timeout, _} | Opts], Acc) ->
637
    conn_opts(Opts, [Opt | Acc]);
×
638
conn_opts([Opt = {ssl_opts, _} | Opts], Acc) ->
639
    conn_opts(Opts, [Opt | Acc]);
640✔
640
conn_opts([_Opt | Opts], Acc) ->
641
    conn_opts(Opts, Acc).
7,596✔
642

643
parse_sql_template(Config, ChannelId) ->
644
    Queries =
412✔
645
        case Config of
646
            #{prepare_statement := Qs} ->
647
                Qs;
73✔
648
            #{sql := Query} ->
649
                #{ChannelId => Query};
175✔
650
            #{} ->
651
                #{}
164✔
652
        end,
653
    Templates = maps:fold(fun parse_sql_template/3, #{}, Queries),
412✔
654
    #{query_templates => Templates}.
412✔
655

656
parse_sql_template(Key, Query, Acc) ->
657
    Template = emqx_template_sql:parse_prepstmt(Query, #{parameters => '$n'}),
248✔
658
    Acc#{Key => Template}.
248✔
659

660
render_prepare_sql_row(RowTemplate, Data) ->
661
    % NOTE: ignoring errors here, missing variables will be replaced with `null`.
662
    {Row, _Errors} = emqx_template_sql:render_prepstmt(RowTemplate, {emqx_jsonish, Data}),
127✔
663
    Row.
127✔
664

665
init_prepare(State = #{prepares := disabled}) ->
666
    State;
52✔
667
init_prepare(State = #{query_templates := Templates}) when map_size(Templates) == 0 ->
668
    State;
137✔
669
init_prepare(State = #{}) ->
670
    case prepare_sql(State) of
190✔
671
        {ok, PrepStatements} ->
672
            State#{prepares => PrepStatements};
155✔
673
        Error ->
674
            TranslatedError = translate_to_log_context(Error),
35✔
675
            ?SLOG(
35✔
676
                error,
35✔
677
                maps:merge(
678
                    #{msg => <<"postgresql_init_prepare_statement_failed">>},
679
                    TranslatedError
680
                )
×
681
            ),
682
            %% mark the prepares failed
683
            State#{prepares => {error, export_error(TranslatedError)}}
35✔
684
    end.
685

686
prepare_sql(#{query_templates := Templates, pool_name := PoolName}) ->
687
    prepare_sql(maps:to_list(Templates), PoolName).
190✔
688

689
prepare_sql(Templates, PoolName) ->
690
    case do_prepare_sql(Templates, PoolName) of
190✔
691
        {ok, _Sts} = Ok ->
692
            %% prepare for reconnect
693
            ecpool:add_reconnect_callback(PoolName, {?MODULE, prepare_sql_to_conn, [Templates]}),
155✔
694
            Ok;
155✔
695
        Error ->
696
            Error
35✔
697
    end.
698

699
%% this callback accepts the arg list provided to
700
%% ecpool:add_reconnect_callback(PoolName, {?MODULE, prepare_sql_to_conn, [Templates]})
701
%% so ecpool_worker can de-duplicate the callbacks based on the signature.
702
get_reconnect_callback_signature([[{ChannelId, _Template}]]) ->
703
    ChannelId.
2,033✔
704

705
do_prepare_sql(Templates, PoolName) ->
706
    do_prepare_sql(ecpool:workers(PoolName), Templates, #{}).
190✔
707

708
do_prepare_sql([{_Name, Worker} | Rest], Templates, _LastSts) ->
709
    {ok, Conn} = ecpool_worker:client(Worker),
1,275✔
710
    case prepare_sql_to_conn(Conn, Templates) of
1,275✔
711
        {ok, Sts} ->
712
            do_prepare_sql(Rest, Templates, Sts);
1,240✔
713
        Error ->
714
            Error
35✔
715
    end;
716
do_prepare_sql([], _Prepares, LastSts) ->
717
    {ok, LastSts}.
155✔
718

719
prepare_sql_to_conn(Conn, Prepares) ->
720
    prepare_sql_to_conn(Conn, Prepares, #{}, 0).
1,275✔
721

722
prepare_sql_to_conn(Conn, [], Statements, _Attempts) when is_pid(Conn) ->
723
    {ok, Statements};
1,240✔
724
prepare_sql_to_conn(Conn, [{_Key, _} | _Rest], _Statements, _MaxAttempts = 2) when is_pid(Conn) ->
725
    failed_to_remove_prev_prepared_statement_error();
16✔
726
prepare_sql_to_conn(
727
    Conn, [{Key, {SQL, _RowTemplate}} | Rest] = ToPrepare, Statements, Attempts
728
) when is_pid(Conn) ->
729
    LogMeta = #{msg => "postgresql_prepare_statement", name => Key, sql => SQL},
1,299✔
730
    ?SLOG(info, LogMeta),
1,299✔
731
    case epgsql:parse2(Conn, Key, SQL, []) of
1,299✔
732
        {ok, Statement} ->
733
            prepare_sql_to_conn(Conn, Rest, Statements#{Key => Statement}, 0);
1,240✔
734
        {error, #error{severity = error, codename = undefined_table} = Error} ->
735
            %% Target table is not created
736
            ?tp(pgsql_undefined_table, #{}),
18✔
737
            LogMsg =
18✔
738
                maps:merge(
739
                    LogMeta#{msg => "postgresql_parse_failed"},
740
                    translate_to_log_context(Error)
741
                ),
742
            ?SLOG(error, LogMsg),
18✔
743
            {error, undefined_table};
18✔
744
        {error, #error{severity = error, codename = duplicate_prepared_statement}} = Error ->
745
            ?tp(pgsql_prepared_statement_exists, #{}),
40✔
746
            LogMsg =
40✔
747
                maps:merge(
748
                    LogMeta#{
749
                        msg => "postgresql_prepared_statment_with_same_name_already_exists",
750
                        explain => <<
751
                            "A prepared statement with the same name already "
752
                            "exists in the driver. Will attempt to remove the "
753
                            "previous prepared statement with the name and then "
754
                            "try again."
755
                        >>
756
                    },
757
                    translate_to_log_context(Error)
758
                ),
759
            ?SLOG(warning, LogMsg),
40✔
760
            case epgsql:close(Conn, statement, Key) of
40✔
761
                ok ->
762
                    ?SLOG(info, #{msg => "pqsql_closed_statement_successfully"}),
40✔
763
                    prepare_sql_to_conn(Conn, ToPrepare, Statements, Attempts + 1);
40✔
764
                {error, CloseError} ->
765
                    ?SLOG(error, #{msg => "pqsql_close_statement_failed", cause => CloseError}),
×
766
                    failed_to_remove_prev_prepared_statement_error()
×
767
            end;
768
        {error, Error} ->
769
            TranslatedError = translate_to_log_context(Error),
1✔
770
            LogMsg =
1✔
771
                maps:merge(
772
                    LogMeta#{msg => "postgresql_parse_failed"},
773
                    TranslatedError
774
                ),
775
            ?SLOG(error, LogMsg),
1✔
776
            {error, export_error(TranslatedError)}
1✔
777
    end.
778

779
failed_to_remove_prev_prepared_statement_error() ->
780
    Msg =
16✔
781
        ("A previous prepared statement for the action already exists "
782
        "but cannot be closed. Please, try to disable and then enable "
783
        "the connector to resolve this issue."),
784
    {error, unicode:characters_to_binary(Msg)}.
16✔
785

786
to_bin(Bin) when is_binary(Bin) ->
787
    Bin;
945✔
788
to_bin(Atom) when is_atom(Atom) ->
789
    erlang:atom_to_binary(Atom).
4✔
790

791
handle_result({error, {recoverable_error, _Error}} = Res) ->
792
    Res;
×
793
handle_result({error, {unrecoverable_error, _Error}} = Res) ->
794
    Res;
4✔
795
handle_result({error, disconnected}) ->
796
    {error, {recoverable_error, disconnected}};
6✔
797
handle_result({error, Error}) ->
798
    TranslatedError = translate_to_log_context(Error),
8✔
799
    {error, {unrecoverable_error, export_error(TranslatedError)}};
8✔
800
handle_result(Res) ->
801
    Res.
185✔
802

803
on_format_query_result({ok, Cnt}) when is_integer(Cnt) ->
804
    #{result => ok, affected_rows => Cnt};
×
805
on_format_query_result(Res) ->
806
    Res.
×
807

808
handle_batch_result([{ok, Count} | Rest], Acc) ->
809
    handle_batch_result(Rest, Acc + Count);
88✔
810
handle_batch_result([{error, Error} | _Rest], _Acc) ->
811
    TranslatedError = translate_to_log_context(Error),
4✔
812
    {error, {unrecoverable_error, export_error(TranslatedError)}};
4✔
813
handle_batch_result([], Acc) ->
814
    ?tp("postgres_success_batch_result", #{row_count => Acc}),
34✔
815
    {ok, Acc}.
34✔
816

817
translate_to_log_context({error, Reason}) ->
818
    translate_to_log_context(Reason);
75✔
819
translate_to_log_context(#error{} = Reason) ->
820
    #error{
67✔
821
        severity = Severity,
822
        code = Code,
823
        codename = Codename,
824
        message = Message,
825
        extra = Extra
826
    } = Reason,
827
    #{
67✔
828
        driver_severity => Severity,
829
        driver_error_codename => Codename,
830
        driver_error_code => Code,
831
        driver_error_message => emqx_logger_textfmt:try_format_unicode(Message),
832
        driver_error_extra => Extra
833
    };
834
translate_to_log_context(Reason) ->
835
    #{reason => Reason}.
53✔
836

837
export_error(#{
838
    driver_severity := Severity,
839
    driver_error_codename := Codename,
840
    driver_error_code := Code
841
}) ->
842
    %% Extra information has already been logged.
843
    #{
9✔
844
        error_code => Code,
845
        error_codename => Codename,
846
        severity => Severity
847
    };
848
export_error(#{reason := Reason}) ->
849
    Reason;
53✔
850
export_error(Error) ->
851
    Error.
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc