• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

emqx / emqx / 15284878858

27 May 2025 08:18PM UTC coverage: 82.486%. First build
15284878858

Pull #15275

github

web-flow
Merge 7b73ebade into 5482fd340
Pull Request #15275: fix(postgres): reconnect after health check failures, return more info (port of #15274 to `release-58`)

12 of 15 new or added lines in 1 file covered. (80.0%)

57873 of 70161 relevant lines covered (82.49%)

15023.86 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.17
/apps/emqx_postgresql/src/emqx_postgresql.erl
1
%%--------------------------------------------------------------------
2
%% Copyright (c) 2020-2025 EMQ Technologies Co., Ltd. All Rights Reserved.
3
%%
4
%% Licensed under the Apache License, Version 2.0 (the "License");
5
%% you may not use this file except in compliance with the License.
6
%% You may obtain a copy of the License at
7
%%
8
%%     http://www.apache.org/licenses/LICENSE-2.0
9
%%
10
%% Unless required by applicable law or agreed to in writing, software
11
%% distributed under the License is distributed on an "AS IS" BASIS,
12
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
%% See the License for the specific language governing permissions and
14
%% limitations under the License.
15
%%--------------------------------------------------------------------
16
-module(emqx_postgresql).
17

18
-include("emqx_postgresql.hrl").
19
-include_lib("emqx_resource/include/emqx_resource.hrl").
20
-include_lib("emqx_connector/include/emqx_connector.hrl").
21
-include_lib("typerefl/include/types.hrl").
22
-include_lib("emqx/include/logger.hrl").
23
-include_lib("hocon/include/hoconsc.hrl").
24
-include_lib("epgsql/include/epgsql.hrl").
25
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
26

27
-export([roots/0, fields/1, namespace/0]).
28

29
-behaviour(emqx_resource).
30

31
%% callbacks of behaviour emqx_resource
32
-export([
33
    resource_type/0,
34
    callback_mode/0,
35
    on_start/2,
36
    on_stop/2,
37
    on_query/3,
38
    on_batch_query/3,
39
    on_get_status/2,
40
    on_add_channel/4,
41
    on_remove_channel/3,
42
    on_get_channels/1,
43
    on_get_channel_status/3,
44
    on_format_query_result/1
45
]).
46

47
-export([connect/1]).
48

49
-export([
50
    query/3,
51
    prepared_query/3,
52
    execute_batch/3
53
]).
54

55
-export([disable_prepared_statements/0]).
56

57
%% for ecpool workers usage
58
-export([do_get_status/1, prepare_sql_to_conn/2, get_reconnect_callback_signature/1]).
59

60
-define(PGSQL_HOST_OPTIONS, #{
61
    default_port => ?PGSQL_DEFAULT_PORT
62
}).
63

64
-type template() :: {unicode:chardata(), emqx_template_sql:row_template()}.
65
-type state() ::
66
    #{
67
        pool_name := binary(),
68
        query_templates := #{binary() => template()},
69
        prepares := disabled | #{binary() => epgsql:statement()} | {error, _}
70
    }.
71

72
%% FIXME: add `{error, sync_required}' to `epgsql:execute_batch'
73
%% We want to be able to call sync if any message from the backend leaves the driver in an
74
%% inconsistent state needing sync.
75
-dialyzer({nowarn_function, [execute_batch/3]}).
76

77
%%=====================================================================
78

79
namespace() -> postgres.
×
80

81
roots() ->
82
    [{config, #{type => hoconsc:ref(?MODULE, config)}}].
2✔
83

84
fields(config) ->
85
    [
86
        {server, server()},
87
        disable_prepared_statements()
88
    ] ++
2,607✔
89
        adjust_fields(emqx_connector_schema_lib:relational_db_fields()) ++
90
        emqx_connector_schema_lib:ssl_fields() ++
91
        emqx_connector_schema_lib:prepare_statement_fields().
92

93
server() ->
94
    Meta = #{desc => ?DESC("server")},
2,607✔
95
    emqx_schema:servers_sc(Meta, ?PGSQL_HOST_OPTIONS).
2,607✔
96

97
disable_prepared_statements() ->
98
    {disable_prepared_statements,
43,307✔
99
        hoconsc:mk(
100
            boolean(),
101
            #{
102
                default => false,
103
                required => false,
104
                desc => ?DESC("disable_prepared_statements")
105
            }
106
        )}.
107

108
adjust_fields(Fields) ->
109
    lists:map(
2,607✔
110
        fun
111
            ({username, Sc}) ->
112
                %% to please dialyzer...
113
                Override = #{type => hocon_schema:field_schema(Sc, type), required => true},
2,607✔
114
                {username, hocon_schema:override(Sc, Override)};
2,607✔
115
            (Field) ->
116
                Field
10,428✔
117
        end,
118
        Fields
119
    ).
120

121
%% ===================================================================
122
resource_type() -> pgsql.
240✔
123

124
callback_mode() -> always_sync.
240✔
125

126
-spec on_start(binary(), hocon:config()) -> {ok, state()} | {error, _}.
127
on_start(
128
    InstId,
129
    #{
130
        server := Server,
131
        disable_prepared_statements := DisablePreparedStatements,
132
        database := DB,
133
        username := User,
134
        pool_size := PoolSize,
135
        ssl := SSL
136
    } = Config
137
) ->
138
    #{hostname := Host, port := Port} = emqx_schema:parse_server(Server, ?PGSQL_HOST_OPTIONS),
240✔
139
    ?SLOG(info, #{
240✔
140
        msg => "starting_postgresql_connector",
141
        connector => InstId,
142
        config => emqx_utils:redact(Config)
143
    }),
240✔
144
    SslOpts =
240✔
145
        case maps:get(enable, SSL) of
146
            true ->
147
                [
79✔
148
                    %% note: this is converted to `required' in
149
                    %% `conn_opts/2', and there's a boolean guard
150
                    %% there; if this is set to `required' here,
151
                    %% that'll require changing `conn_opts/2''s guard.
152
                    {ssl, true},
153
                    {ssl_opts, emqx_tls_lib:to_client_opts(SSL)}
154
                ];
155
            false ->
156
                [{ssl, false}]
161✔
157
        end,
158
    Options = [
240✔
159
        {host, Host},
160
        {port, Port},
161
        {username, User},
162
        {password, maps:get(password, Config, emqx_secret:wrap(""))},
163
        {database, DB},
164
        {auto_reconnect, ?AUTO_RECONNECT_INTERVAL},
165
        {pool_size, PoolSize}
166
    ],
167
    State1 = parse_sql_template(Config, <<"send_message">>),
240✔
168
    State2 = State1#{installed_channels => #{}},
240✔
169
    case emqx_resource_pool:start(InstId, ?MODULE, Options ++ SslOpts) of
240✔
170
        ok ->
171
            Prepares =
207✔
172
                case DisablePreparedStatements of
173
                    true -> disabled;
28✔
174
                    false -> #{}
179✔
175
                end,
176
            case init_prepare(State2#{pool_name => InstId, prepares => Prepares}) of
207✔
177
                #{prepares := {error, _} = Error} ->
178
                    Error;
2✔
179
                State ->
180
                    {ok, State}
205✔
181
            end;
182
        {error, Reason} ->
183
            ?tp(
33✔
184
                pgsql_connector_start_failed,
185
                #{error => Reason}
186
            ),
187
            {error, Reason}
33✔
188
    end.
189

190
on_stop(InstId, State) ->
191
    ?SLOG(info, #{
205✔
192
        msg => "stopping_postgresql_connector",
193
        connector => InstId
194
    }),
205✔
195
    close_connections(State),
205✔
196
    Res = emqx_resource_pool:stop(InstId),
205✔
197
    ?tp(postgres_stopped, #{instance_id => InstId}),
205✔
198
    Res.
205✔
199

200
close_connections(#{pool_name := PoolName} = _State) ->
201
    WorkerPids = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)],
205✔
202
    close_connections_with_worker_pids(WorkerPids),
205✔
203
    ok.
205✔
204

205
close_connections_with_worker_pids([WorkerPid | Rest]) ->
206
    %% We ignore errors since any error probably means that the
207
    %% connection is closed already.
208
    try ecpool_worker:client(WorkerPid) of
1,624✔
209
        {ok, Conn} ->
210
            _ = epgsql:close(Conn),
1,478✔
211
            close_connections_with_worker_pids(Rest);
1,478✔
212
        _ ->
213
            close_connections_with_worker_pids(Rest)
146✔
214
    catch
215
        _:_ ->
216
            close_connections_with_worker_pids(Rest)
×
217
    end;
218
close_connections_with_worker_pids([]) ->
219
    ok.
205✔
220

221
on_add_channel(
222
    _InstId,
223
    #{
224
        installed_channels := InstalledChannels
225
    } = OldState,
226
    ChannelId,
227
    ChannelConfig
228
) ->
229
    %% The following will throw an exception if the bridge producers fails to start
230
    {ok, ChannelState} = create_channel_state(ChannelId, OldState, ChannelConfig),
176✔
231
    case ChannelState of
176✔
232
        #{prepares := {error, Reason}} ->
233
            {error, {unhealthy_target, Reason}};
33✔
234
        _ ->
235
            NewInstalledChannels = maps:put(ChannelId, ChannelState, InstalledChannels),
143✔
236
            %% Update state
237
            NewState = OldState#{installed_channels => NewInstalledChannels},
143✔
238
            {ok, NewState}
143✔
239
    end.
240

241
create_channel_state(
242
    ChannelId,
243
    #{
244
        pool_name := PoolName,
245
        prepares := Prepares
246
    } = _ConnectorState,
247
    #{parameters := Parameters} = _ChannelConfig
248
) ->
249
    State1 = parse_sql_template(Parameters, ChannelId),
176✔
250
    {ok,
176✔
251
        init_prepare(State1#{
252
            pool_name => PoolName,
253
            prepares => Prepares,
254
            prepare_statement => #{}
255
        })}.
256

257
on_remove_channel(
258
    _InstId,
259
    #{
260
        installed_channels := InstalledChannels
261
    } = OldState,
262
    ChannelId
263
) ->
264
    %% Close prepared statements
265
    ok = close_prepared_statement(ChannelId, OldState),
133✔
266
    NewInstalledChannels = maps:remove(ChannelId, InstalledChannels),
133✔
267
    %% Update state
268
    NewState = OldState#{installed_channels => NewInstalledChannels},
133✔
269
    {ok, NewState}.
133✔
270

271
close_prepared_statement(_ChannelId, #{prepares := disabled}) ->
272
    ok;
24✔
273
close_prepared_statement(ChannelId, #{pool_name := PoolName} = State) ->
274
    WorkerPids = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)],
109✔
275
    close_prepared_statement(WorkerPids, ChannelId, State),
109✔
276
    ok.
109✔
277

278
close_prepared_statement([WorkerPid | Rest], ChannelId, State) ->
279
    %% We ignore errors since any error probably means that the
280
    %% prepared statement doesn't exist. If it exists when we try
281
    %% to insert one with the same name, we will try to remove it
282
    %% again anyway.
283
    try ecpool_worker:client(WorkerPid) of
872✔
284
        {ok, Conn} ->
285
            ok = ecpool_worker:remove_reconnect_callback_by_signature(WorkerPid, ChannelId),
792✔
286
            case get_templated_statement(ChannelId, State) of
792✔
287
                {ok, Statement} ->
288
                    _ = epgsql:close(Conn, Statement),
792✔
289
                    close_prepared_statement(Rest, ChannelId, State);
792✔
290
                error ->
291
                    %% channel was not added
292
                    ok
×
293
            end;
294
        _ ->
295
            close_prepared_statement(Rest, ChannelId, State)
80✔
296
    catch
297
        _:_ ->
298
            close_prepared_statement(Rest, ChannelId, State)
×
299
    end;
300
close_prepared_statement([], _ChannelId, _State) ->
301
    ok.
109✔
302

303
on_get_channel_status(
304
    _ResId,
305
    ChannelId,
306
    #{
307
        pool_name := PoolName,
308
        installed_channels := Channels
309
    } = _State
310
) ->
311
    ChannelState = maps:get(ChannelId, Channels),
214✔
312
    case
214✔
313
        do_check_channel_sql(
314
            PoolName,
315
            ChannelId,
316
            ChannelState
317
        )
318
    of
319
        ok ->
320
            ?status_connected;
204✔
321
        {error, undefined_table} ->
322
            {?status_disconnected, {unhealthy_target, <<"Table does not exist">>}}
8✔
323
    end.
324

325
do_check_channel_sql(
326
    PoolName,
327
    ChannelId,
328
    #{query_templates := ChannelQueryTemplates} = _ChannelState
329
) ->
330
    {SQL, _RowTemplate} = maps:get(ChannelId, ChannelQueryTemplates),
214✔
331
    WorkerPids = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)],
214✔
332
    validate_table_existence(WorkerPids, SQL).
214✔
333

334
on_get_channels(ResId) ->
335
    emqx_bridge_v2:get_channels_for_connector(ResId).
911✔
336

337
-spec on_query
338
    %% Called from authn and authz modules
339
    (connector_resource_id(), {prepared_query, binary(), [term()]}, state()) ->
340
        {ok, _} | {error, term()};
341
    %% Called from bridges
342
    (connector_resource_id(), {action_resource_id(), map()}, state()) ->
343
        {ok, _} | {error, term()}.
344
on_query(InstId, {TypeOrKey, NameOrMap}, State) ->
345
    on_query(InstId, {TypeOrKey, NameOrMap, []}, State);
98✔
346
on_query(
347
    InstId,
348
    {TypeOrKey, NameOrMap, Params},
349
    #{pool_name := PoolName} = State
350
) ->
351
    ?TRACE("QUERY", "postgresql_connector_received_sql_query", #{
204✔
352
        connector => InstId,
353
        type => TypeOrKey,
354
        sql => NameOrMap,
355
        state => State
356
    }),
204✔
357
    {QueryType, NameOrSQL2, Data} = proc_sql_params(TypeOrKey, NameOrMap, Params, State),
204✔
358
    emqx_trace:rendered_action_template(
204✔
359
        TypeOrKey,
360
        #{
361
            statement_type => QueryType,
362
            statement_or_name => NameOrSQL2,
363
            data => Data
364
        }
365
    ),
366
    Res = on_sql_query(InstId, PoolName, QueryType, NameOrSQL2, Data),
204✔
367
    ?tp(postgres_bridge_connector_on_query_return, #{instance_id => InstId, result => Res}),
201✔
368
    handle_result(Res).
201✔
369

370
on_batch_query(
371
    InstId,
372
    [{Key, _} = Request | _] = BatchReq,
373
    #{pool_name := PoolName} = State
374
) ->
375
    BinKey = to_bin(Key),
46✔
376
    case get_template(BinKey, State) of
46✔
377
        undefined ->
378
            Log = #{
4✔
379
                connector => InstId,
380
                first_request => Request,
381
                state => State,
382
                msg => "batch prepare not implemented"
383
            },
384
            ?SLOG(error, Log),
4✔
385
            {error, {unrecoverable_error, batch_prepare_not_implemented}};
4✔
386
        {_Statement, RowTemplate} ->
387
            {ok, StatementTemplate} = get_templated_statement(BinKey, State),
42✔
388
            Rows = [render_prepare_sql_row(RowTemplate, Data) || {_Key, Data} <- BatchReq],
42✔
389
            emqx_trace:rendered_action_template(
42✔
390
                Key,
391
                #{
392
                    statement_type => execute_batch,
393
                    statement_or_name => StatementTemplate,
394
                    data => Rows
395
                }
396
            ),
397
            case on_sql_query(InstId, PoolName, execute_batch, StatementTemplate, Rows) of
42✔
398
                {error, _Error} = Result ->
399
                    handle_result(Result);
4✔
400
                {_Column, Results} ->
401
                    handle_batch_result(Results, 0)
38✔
402
            end
403
    end;
404
on_batch_query(InstId, BatchReq, State) ->
405
    ?SLOG(error, #{
4✔
406
        connector => InstId,
407
        request => BatchReq,
408
        state => State,
409
        msg => "invalid request"
410
    }),
×
411
    {error, {unrecoverable_error, invalid_request}}.
4✔
412

413
proc_sql_params(ActionResId, #{} = Map, [], State) when is_binary(ActionResId) ->
414
    %% When this connector is called from actions/bridges.
415
    DisablePreparedStatements = prepared_statements_disabled(State),
32✔
416
    {ExprTemplate, RowTemplate} = get_template(ActionResId, State),
32✔
417
    Rendered = render_prepare_sql_row(RowTemplate, Map),
32✔
418
    case DisablePreparedStatements of
32✔
419
        true ->
420
            {query, ExprTemplate, Rendered};
6✔
421
        false ->
422
            {prepared_query, ActionResId, Rendered}
26✔
423
    end;
424
proc_sql_params(prepared_query, ConnResId, Params, State) ->
425
    %% When this connector is called from authn/authz modules
426
    DisablePreparedStatements = prepared_statements_disabled(State),
79✔
427
    case DisablePreparedStatements of
79✔
428
        true ->
429
            #{query_templates := #{ConnResId := {ExprTemplate, _VarsTemplate}}} = State,
9✔
430
            {query, ExprTemplate, Params};
9✔
431
        false ->
432
            %% Connector resource id itself is the prepared statement name
433
            {prepared_query, ConnResId, Params}
70✔
434
    end;
435
proc_sql_params(QueryType, SQL, Params, _State) when
436
    is_atom(QueryType) andalso
437
        (is_binary(SQL) orelse is_list(SQL)) andalso
438
        is_list(Params)
439
->
440
    %% When called to do ad-hoc commands/queries.
441
    {QueryType, SQL, Params}.
93✔
442

443
prepared_statements_disabled(State) ->
444
    maps:get(prepares, State, #{}) =:= disabled.
111✔
445

446
get_template(Key, #{installed_channels := Channels} = _State) when is_map_key(Key, Channels) ->
447
    BinKey = to_bin(Key),
74✔
448
    ChannelState = maps:get(BinKey, Channels),
74✔
449
    ChannelQueryTemplates = maps:get(query_templates, ChannelState),
74✔
450
    maps:get(BinKey, ChannelQueryTemplates);
74✔
451
get_template(Key, #{query_templates := Templates}) ->
452
    BinKey = to_bin(Key),
4✔
453
    maps:get(BinKey, Templates, undefined).
4✔
454

455
get_templated_statement(Key, #{installed_channels := Channels} = _State) ->
456
    BinKey = to_bin(Key),
834✔
457
    case is_map_key(BinKey, Channels) of
834✔
458
        true ->
459
            ChannelState = maps:get(BinKey, Channels),
834✔
460
            case ChannelState of
834✔
461
                #{prepares := disabled, query_templates := #{BinKey := {ExprTemplate, _}}} ->
462
                    {ok, ExprTemplate};
6✔
463
                #{prepares := #{BinKey := ExprTemplate}} ->
464
                    {ok, ExprTemplate}
828✔
465
            end;
466
        false ->
467
            error
×
468
    end;
469
get_templated_statement(Key, #{prepares := PrepStatements}) ->
470
    BinKey = to_bin(Key),
×
471
    {ok, maps:get(BinKey, PrepStatements)}.
×
472

473
on_sql_query(InstId, PoolName, Type, NameOrSQL, Data) ->
474
    try ecpool:pick_and_do(PoolName, {?MODULE, Type, [NameOrSQL, Data]}, no_handover) of
246✔
475
        {error, Reason} ->
476
            ?tp(
17✔
477
                pgsql_connector_query_return,
478
                #{error => Reason}
479
            ),
480
            TranslatedError = translate_to_log_context(Reason),
17✔
481
            ?SLOG(
17✔
482
                error,
17✔
483
                maps:merge(
484
                    #{
485
                        msg => "postgresql_connector_do_sql_query_failed",
486
                        connector => InstId,
487
                        type => Type,
488
                        sql => NameOrSQL
489
                    },
490
                    TranslatedError
491
                )
×
492
            ),
493
            case Reason of
17✔
494
                sync_required ->
495
                    {error, {recoverable_error, Reason}};
×
496
                ecpool_empty ->
497
                    {error, {recoverable_error, Reason}};
×
498
                {error, error, _, undefined_table, _, _} ->
499
                    {error, {unrecoverable_error, export_error(TranslatedError)}};
×
500
                _ ->
501
                    {error, export_error(TranslatedError)}
17✔
502
            end;
503
        Result ->
504
            ?tp(
222✔
505
                pgsql_connector_query_return,
506
                #{result => Result}
507
            ),
508
            Result
222✔
509
    catch
510
        error:function_clause:Stacktrace ->
511
            ?SLOG(error, #{
4✔
512
                msg => "postgresql_connector_do_sql_query_failed",
513
                connector => InstId,
514
                type => Type,
515
                sql => NameOrSQL,
516
                reason => function_clause,
517
                stacktrace => Stacktrace
518
            }),
×
519
            {error, {unrecoverable_error, invalid_request}}
4✔
520
    end.
521

522
on_get_status(_InstId, #{pool_name := PoolName} = ConnState) ->
523
    Res = emqx_resource_pool:health_check_workers(
468✔
524
        PoolName,
525
        fun ?MODULE:do_get_status/1,
526
        emqx_resource_pool:health_check_timeout(),
527
        #{return_values => true}
528
    ),
529
    case Res of
466✔
530
        {ok, []} ->
NEW
531
            {?status_connecting, <<"connection_pool_not_initialized">>};
×
532
        {ok, Results} ->
533
            Errors =
465✔
534
                lists:filter(
535
                    fun
536
                        ({ok, _, _}) ->
537
                            false;
3,656✔
538
                        (_) ->
539
                            true
64✔
540
                    end,
541
                    Results
542
                ),
543
            case Errors of
465✔
544
                [] -> on_get_status_prepares(ConnState);
457✔
545
                [{error, Reason} | _] -> {?status_disconnected, Reason};
8✔
NEW
546
                [Reason | _] -> {?status_disconnected, Reason}
×
547
            end;
548
        {error, timeout} ->
549
            %% We trigger a full reconnection if the health check times out, by declaring
550
            %% the connector `?status_disconnected`.  We choose to do this because there
551
            %% have been issues where the connection process does not die and the
552
            %% connection itself unusable.
553
            {?status_disconnected, <<"health_check_timeout">>}
1✔
554
    end.
555

556
on_get_status_prepares(ConnState) ->
557
    case do_check_prepares(ConnState) of
457✔
558
        ok ->
559
            ?status_connected;
457✔
560
        {error, undefined_table} ->
561
            %% return error indicating that we are connected but the target table
562
            %% is not created
NEW
563
            {?status_disconnected, {unhealthy_target, undefined_table}}
×
564
    end.
565

566
do_get_status(Conn) ->
567
    epgsql:squery(Conn, "SELECT count(1) AS T").
3,694✔
568

569
do_check_prepares(
570
    #{
571
        pool_name := PoolName,
572
        query_templates := #{<<"send_message">> := {SQL, _RowTemplate}}
573
    }
574
) ->
575
    WorkerPids = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)],
×
576
    case validate_table_existence(WorkerPids, SQL) of
×
577
        ok ->
578
            ok;
×
579
        {error, Reason} ->
580
            {error, Reason}
×
581
    end;
582
do_check_prepares(_) ->
583
    ok.
457✔
584

585
-spec validate_table_existence([pid()], binary()) -> ok | {error, undefined_table}.
586
validate_table_existence([WorkerPid | Rest], SQL) ->
587
    try ecpool_worker:client(WorkerPid) of
214✔
588
        {ok, Conn} ->
589
            case epgsql:parse2(Conn, "", SQL, []) of
214✔
590
                {error, {_, _, _, undefined_table, _, _}} ->
591
                    {error, undefined_table};
8✔
592
                Res when is_tuple(Res) andalso ok == element(1, Res) ->
593
                    ok;
204✔
594
                Res ->
595
                    ?tp(postgres_connector_bad_parse2, #{result => Res}),
×
596
                    validate_table_existence(Rest, SQL)
×
597
            end;
598
        _ ->
599
            validate_table_existence(Rest, SQL)
×
600
    catch
601
        exit:{noproc, _} ->
602
            validate_table_existence(Rest, SQL)
×
603
    end;
604
validate_table_existence([], _SQL) ->
605
    %% All workers either replied an unexpected error; we will retry
606
    %% on the next health check.
607
    ok.
×
608

609
%% ===================================================================
610

611
connect(Opts) ->
612
    Host = proplists:get_value(host, Opts),
1,927✔
613
    Username = proplists:get_value(username, Opts),
1,927✔
614
    %% TODO: teach `epgsql` to accept 0-arity closures as passwords.
615
    Password = emqx_secret:unwrap(proplists:get_value(password, Opts)),
1,927✔
616
    case epgsql:connect(Host, Username, Password, conn_opts(Opts)) of
1,927✔
617
        {ok, _Conn} = Ok ->
618
            Ok;
1,662✔
619
        {error, Reason} ->
620
            {error, Reason}
265✔
621
    end.
622

623
query(Conn, SQL, Params) ->
624
    case epgsql:equery(Conn, SQL, Params) of
108✔
625
        {error, sync_required} = Res ->
626
            ok = epgsql:sync(Conn),
×
627
            Res;
×
628
        Res ->
629
            Res
101✔
630
    end.
631

632
prepared_query(Conn, Name, Params) ->
633
    case epgsql:prepared_query2(Conn, Name, Params) of
92✔
634
        {error, sync_required} = Res ->
635
            ok = epgsql:sync(Conn),
×
636
            Res;
×
637
        Res ->
638
            Res
92✔
639
    end.
640

641
execute_batch(Conn, Statement, Params) ->
642
    case epgsql:execute_batch(Conn, Statement, Params) of
38✔
643
        {error, sync_required} = Res ->
644
            ok = epgsql:sync(Conn),
×
645
            Res;
×
646
        Res ->
647
            Res
38✔
648
    end.
649

650
conn_opts(Opts) ->
651
    conn_opts(Opts, []).
1,927✔
652
conn_opts([], Acc) ->
653
    Acc;
1,927✔
654
conn_opts([Opt = {database, _} | Opts], Acc) ->
655
    conn_opts(Opts, [Opt | Acc]);
1,927✔
656
conn_opts([{ssl, Bool} | Opts], Acc) when is_boolean(Bool) ->
657
    Flag =
1,927✔
658
        case Bool of
659
            true -> required;
640✔
660
            false -> false
1,287✔
661
        end,
662
    conn_opts(Opts, [{ssl, Flag} | Acc]);
1,927✔
663
conn_opts([Opt = {port, _} | Opts], Acc) ->
664
    conn_opts(Opts, [Opt | Acc]);
1,927✔
665
conn_opts([Opt = {timeout, _} | Opts], Acc) ->
666
    conn_opts(Opts, [Opt | Acc]);
×
667
conn_opts([Opt = {ssl_opts, _} | Opts], Acc) ->
668
    conn_opts(Opts, [Opt | Acc]);
640✔
669
conn_opts([_Opt | Opts], Acc) ->
670
    conn_opts(Opts, Acc).
7,708✔
671

672
parse_sql_template(Config, ChannelId) ->
673
    Queries =
416✔
674
        case Config of
675
            #{prepare_statement := Qs} ->
676
                Qs;
73✔
677
            #{sql := Query} ->
678
                #{ChannelId => Query};
176✔
679
            #{} ->
680
                #{}
167✔
681
        end,
682
    Templates = maps:fold(fun parse_sql_template/3, #{}, Queries),
416✔
683
    #{query_templates => Templates}.
416✔
684

685
parse_sql_template(Key, Query, Acc) ->
686
    Template = emqx_template_sql:parse_prepstmt(Query, #{parameters => '$n'}),
249✔
687
    Acc#{Key => Template}.
249✔
688

689
render_prepare_sql_row(RowTemplate, Data) ->
690
    % NOTE: ignoring errors here, missing variables will be replaced with `null`.
691
    {Row, _Errors} = emqx_template_sql:render_prepstmt(RowTemplate, {emqx_jsonish, Data}),
128✔
692
    Row.
128✔
693

694
init_prepare(State = #{prepares := disabled}) ->
695
    State;
52✔
696
init_prepare(State = #{query_templates := Templates}) when map_size(Templates) == 0 ->
697
    State;
140✔
698
init_prepare(State = #{}) ->
699
    case prepare_sql(State) of
191✔
700
        {ok, PrepStatements} ->
701
            State#{prepares => PrepStatements};
156✔
702
        Error ->
703
            TranslatedError = translate_to_log_context(Error),
35✔
704
            ?SLOG(
35✔
705
                error,
35✔
706
                maps:merge(
707
                    #{msg => <<"postgresql_init_prepare_statement_failed">>},
708
                    TranslatedError
709
                )
×
710
            ),
711
            %% mark the prepares failed
712
            State#{prepares => {error, export_error(TranslatedError)}}
35✔
713
    end.
714

715
prepare_sql(#{query_templates := Templates, pool_name := PoolName}) ->
716
    prepare_sql(maps:to_list(Templates), PoolName).
191✔
717

718
prepare_sql(Templates, PoolName) ->
719
    case do_prepare_sql(Templates, PoolName) of
191✔
720
        {ok, _Sts} = Ok ->
721
            %% prepare for reconnect
722
            ecpool:add_reconnect_callback(PoolName, {?MODULE, prepare_sql_to_conn, [Templates]}),
156✔
723
            Ok;
156✔
724
        Error ->
725
            Error
35✔
726
    end.
727

728
%% this callback accepts the arg list provided to
729
%% ecpool:add_reconnect_callback(PoolName, {?MODULE, prepare_sql_to_conn, [Templates]})
730
%% so ecpool_worker can de-duplicate the callbacks based on the signature.
731
get_reconnect_callback_signature([[{ChannelId, _Template}]]) ->
732
    ChannelId.
2,049✔
733

734
do_prepare_sql(Templates, PoolName) ->
735
    do_prepare_sql(ecpool:workers(PoolName), Templates, #{}).
191✔
736

737
do_prepare_sql([{_Name, Worker} | Rest], Templates, _LastSts) ->
738
    {ok, Conn} = ecpool_worker:client(Worker),
1,283✔
739
    case prepare_sql_to_conn(Conn, Templates) of
1,283✔
740
        {ok, Sts} ->
741
            do_prepare_sql(Rest, Templates, Sts);
1,248✔
742
        Error ->
743
            Error
35✔
744
    end;
745
do_prepare_sql([], _Prepares, LastSts) ->
746
    {ok, LastSts}.
156✔
747

748
prepare_sql_to_conn(Conn, Prepares) ->
749
    prepare_sql_to_conn(Conn, Prepares, #{}, 0).
1,283✔
750

751
prepare_sql_to_conn(Conn, [], Statements, _Attempts) when is_pid(Conn) ->
752
    {ok, Statements};
1,248✔
753
prepare_sql_to_conn(Conn, [{_Key, _} | _Rest], _Statements, _MaxAttempts = 2) when is_pid(Conn) ->
754
    failed_to_remove_prev_prepared_statement_error();
16✔
755
prepare_sql_to_conn(
756
    Conn, [{Key, {SQL, _RowTemplate}} | Rest] = ToPrepare, Statements, Attempts
757
) when is_pid(Conn) ->
758
    LogMeta = #{msg => "postgresql_prepare_statement", name => Key, sql => SQL},
1,307✔
759
    ?SLOG(info, LogMeta),
1,307✔
760
    case epgsql:parse2(Conn, Key, SQL, []) of
1,307✔
761
        {ok, Statement} ->
762
            prepare_sql_to_conn(Conn, Rest, Statements#{Key => Statement}, 0);
1,248✔
763
        {error, #error{severity = error, codename = undefined_table} = Error} ->
764
            %% Target table is not created
765
            ?tp(pgsql_undefined_table, #{}),
18✔
766
            LogMsg =
18✔
767
                maps:merge(
768
                    LogMeta#{msg => "postgresql_parse_failed"},
769
                    translate_to_log_context(Error)
770
                ),
771
            ?SLOG(error, LogMsg),
18✔
772
            {error, undefined_table};
18✔
773
        {error, #error{severity = error, codename = duplicate_prepared_statement}} = Error ->
774
            ?tp(pgsql_prepared_statement_exists, #{}),
40✔
775
            LogMsg =
40✔
776
                maps:merge(
777
                    LogMeta#{
778
                        msg => "postgresql_prepared_statment_with_same_name_already_exists",
779
                        explain => <<
780
                            "A prepared statement with the same name already "
781
                            "exists in the driver. Will attempt to remove the "
782
                            "previous prepared statement with the name and then "
783
                            "try again."
784
                        >>
785
                    },
786
                    translate_to_log_context(Error)
787
                ),
788
            ?SLOG(warning, LogMsg),
40✔
789
            case epgsql:close(Conn, statement, Key) of
40✔
790
                ok ->
791
                    ?SLOG(info, #{msg => "pqsql_closed_statement_successfully"}),
40✔
792
                    prepare_sql_to_conn(Conn, ToPrepare, Statements, Attempts + 1);
40✔
793
                {error, CloseError} ->
794
                    ?SLOG(error, #{msg => "pqsql_close_statement_failed", cause => CloseError}),
×
795
                    failed_to_remove_prev_prepared_statement_error()
×
796
            end;
797
        {error, Error} ->
798
            TranslatedError = translate_to_log_context(Error),
1✔
799
            LogMsg =
1✔
800
                maps:merge(
801
                    LogMeta#{msg => "postgresql_parse_failed"},
802
                    TranslatedError
803
                ),
804
            ?SLOG(error, LogMsg),
1✔
805
            {error, export_error(TranslatedError)}
1✔
806
    end.
807

808
failed_to_remove_prev_prepared_statement_error() ->
809
    Msg =
16✔
810
        ("A previous prepared statement for the action already exists "
811
        "but cannot be closed. Please, try to disable and then enable "
812
        "the connector to resolve this issue."),
813
    {error, unicode:characters_to_binary(Msg)}.
16✔
814

815
to_bin(Bin) when is_binary(Bin) ->
816
    Bin;
954✔
817
to_bin(Atom) when is_atom(Atom) ->
818
    erlang:atom_to_binary(Atom).
4✔
819

820
handle_result({error, {recoverable_error, _Error}} = Res) ->
821
    Res;
×
822
handle_result({error, {unrecoverable_error, _Error}} = Res) ->
823
    Res;
4✔
824
handle_result({error, disconnected}) ->
825
    {error, {recoverable_error, disconnected}};
8✔
826
handle_result({error, #{reason := bad_param} = Context}) ->
827
    ?tp("postgres_bad_param_error", #{context => Context}),
1✔
828
    {error, {unrecoverable_error, Context}};
1✔
829
handle_result({error, Error}) ->
830
    TranslatedError = translate_to_log_context(Error),
8✔
831
    {error, {unrecoverable_error, export_error(TranslatedError)}};
8✔
832
handle_result(Res) ->
833
    Res.
184✔
834

835
on_format_query_result({ok, Cnt}) when is_integer(Cnt) ->
836
    #{result => ok, affected_rows => Cnt};
×
837
on_format_query_result(Res) ->
838
    Res.
×
839

840
handle_batch_result([{ok, Count} | Rest], Acc) ->
841
    handle_batch_result(Rest, Acc + Count);
88✔
842
handle_batch_result([{error, Error} | _Rest], _Acc) ->
843
    TranslatedError = translate_to_log_context(Error),
4✔
844
    {error, {unrecoverable_error, export_error(TranslatedError)}};
4✔
845
handle_batch_result([], Acc) ->
846
    ?tp("postgres_success_batch_result", #{row_count => Acc}),
34✔
847
    {ok, Acc}.
34✔
848

849
translate_to_log_context({error, Reason}) ->
850
    translate_to_log_context(Reason);
75✔
851
translate_to_log_context(#error{} = Reason) ->
852
    #error{
67✔
853
        severity = Severity,
854
        code = Code,
855
        codename = Codename,
856
        message = Message,
857
        extra = Extra
858
    } = Reason,
859
    #{
67✔
860
        driver_severity => Severity,
861
        driver_error_codename => Codename,
862
        driver_error_code => Code,
863
        driver_error_message => emqx_logger_textfmt:try_format_unicode(Message),
864
        driver_error_extra => Extra
865
    };
866
translate_to_log_context(Reason) ->
867
    #{reason => Reason}.
56✔
868

869
export_error(#{
870
    driver_severity := Severity,
871
    driver_error_codename := Codename,
872
    driver_error_code := Code
873
}) ->
874
    %% Extra information has already been logged.
875
    #{
9✔
876
        error_code => Code,
877
        error_codename => Codename,
878
        severity => Severity
879
    };
880
export_error(#{reason := Reason}) ->
881
    Reason;
56✔
882
export_error(Error) ->
883
    Error.
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc