• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

emqx / emqx / 6827041604

10 Nov 2023 04:03PM UTC coverage: 82.664% (-0.08%) from 82.742%
6827041604

push

github

web-flow
Merge pull request #11928 from thalesmg/fix-push-entrypoint-r53-20231110

ci: trigger push entrypoint for `release-5[0-9]`

35482 of 42923 relevant lines covered (82.66%)

6533.55 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.8
/apps/emqx_mysql/src/emqx_mysql.erl
1
%%--------------------------------------------------------------------
2
%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
3
%%
4
%% Licensed under the Apache License, Version 2.0 (the "License");
5
%% you may not use this file except in compliance with the License.
6
%% You may obtain a copy of the License at
7
%%
8
%%     http://www.apache.org/licenses/LICENSE-2.0
9
%%
10
%% Unless required by applicable law or agreed to in writing, software
11
%% distributed under the License is distributed on an "AS IS" BASIS,
12
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
%% See the License for the specific language governing permissions and
14
%% limitations under the License.
15
%%--------------------------------------------------------------------
16
-module(emqx_mysql).
17

18
-include_lib("emqx_connector/include/emqx_connector.hrl").
19
-include_lib("typerefl/include/types.hrl").
20
-include_lib("hocon/include/hoconsc.hrl").
21
-include_lib("emqx/include/logger.hrl").
22
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
23

24
-behaviour(emqx_resource).
25

26
%% callbacks of behaviour emqx_resource
27
-export([
28
    callback_mode/0,
29
    on_start/2,
30
    on_stop/2,
31
    on_query/3,
32
    on_batch_query/3,
33
    on_get_status/2
34
]).
35

36
%% ecpool connect & reconnect
37
-export([connect/1, prepare_sql_to_conn/2]).
38

39
-export([prepare_sql/2]).
40

41
-export([roots/0, fields/1]).
42

43
-export([do_get_status/1]).
44

45
-define(MYSQL_HOST_OPTIONS, #{
46
    default_port => ?MYSQL_DEFAULT_PORT
47
}).
48

49
-type prepares() :: #{atom() => binary()}.
50
-type params_tokens() :: #{atom() => list()}.
51
-type sqls() :: #{atom() => binary()}.
52
-type state() ::
53
    #{
54
        pool_name := binary(),
55
        prepare_statement := prepares(),
56
        params_tokens := params_tokens(),
57
        batch_inserts := sqls(),
58
        batch_params_tokens := params_tokens()
59
    }.
60

61
%%=====================================================================
62
%% Hocon schema
63
roots() ->
64
    [{config, #{type => hoconsc:ref(?MODULE, config)}}].
2✔
65

66
fields(config) ->
67
    [{server, server()}] ++
845✔
68
        add_default_username(emqx_connector_schema_lib:relational_db_fields(), []) ++
69
        emqx_connector_schema_lib:ssl_fields() ++
70
        emqx_connector_schema_lib:prepare_statement_fields().
71

72
add_default_username([{username, OrigUsernameFn} | Tail], Head) ->
73
    Head ++ [{username, add_default_fn(OrigUsernameFn, <<"root">>)} | Tail];
845✔
74
add_default_username([Field | Tail], Head) ->
75
    add_default_username(Tail, Head ++ [Field]).
1,690✔
76

77
add_default_fn(OrigFn, Default) ->
78
    fun
845✔
79
        (default) -> Default;
842✔
80
        (Field) -> OrigFn(Field)
10,300✔
81
    end.
82

83
server() ->
84
    Meta = #{desc => ?DESC("server")},
845✔
85
    emqx_schema:servers_sc(Meta, ?MYSQL_HOST_OPTIONS).
845✔
86

87
%% ===================================================================
88
callback_mode() -> always_sync.
170✔
89

90
-spec on_start(binary(), hoconsc:config()) -> {ok, state()} | {error, _}.
91
on_start(
92
    InstId,
93
    #{
94
        server := Server,
95
        database := DB,
96
        username := Username,
97
        pool_size := PoolSize,
98
        ssl := SSL
99
    } = Config
100
) ->
101
    #{hostname := Host, port := Port} = emqx_schema:parse_server(Server, ?MYSQL_HOST_OPTIONS),
171✔
102
    ?SLOG(info, #{
171✔
103
        msg => "starting_mysql_connector",
104
        connector => InstId,
105
        config => emqx_utils:redact(Config)
106
    }),
171✔
107
    SslOpts =
171✔
108
        case maps:get(enable, SSL) of
109
            true ->
110
                [{ssl, emqx_tls_lib:to_client_opts(SSL)}];
77✔
111
            false ->
112
                []
94✔
113
        end,
114
    Options =
171✔
115
        maybe_add_password_opt(
116
            maps:get(password, Config, undefined),
117
            [
118
                {host, Host},
119
                {port, Port},
120
                {user, Username},
121
                {database, DB},
122
                {auto_reconnect, ?AUTO_RECONNECT_INTERVAL},
123
                {pool_size, PoolSize}
124
            ]
125
        ),
126
    State = parse_prepare_sql(Config),
171✔
127
    case emqx_resource_pool:start(InstId, ?MODULE, Options ++ SslOpts) of
171✔
128
        ok ->
129
            {ok, init_prepare(State#{pool_name => InstId})};
143✔
130
        {error, Reason} ->
131
            ?tp(
28✔
132
                mysql_connector_start_failed,
133
                #{error => Reason}
134
            ),
135
            {error, Reason}
28✔
136
    end.
137

138
maybe_add_password_opt(undefined, Options) ->
139
    Options;
13✔
140
maybe_add_password_opt(Password, Options) ->
141
    [{password, Password} | Options].
158✔
142

143
on_stop(InstId, _State) ->
144
    ?SLOG(info, #{
144✔
145
        msg => "stopping_mysql_connector",
146
        connector => InstId
147
    }),
144✔
148
    emqx_resource_pool:stop(InstId).
144✔
149

150
on_query(InstId, {TypeOrKey, SQLOrKey}, State) ->
151
    on_query(InstId, {TypeOrKey, SQLOrKey, [], default_timeout}, State);
1,814✔
152
on_query(InstId, {TypeOrKey, SQLOrKey, Params}, State) ->
153
    on_query(InstId, {TypeOrKey, SQLOrKey, Params, default_timeout}, State);
66✔
154
on_query(
155
    InstId,
156
    {TypeOrKey, SQLOrKey, Params, Timeout},
157
    #{pool_name := PoolName, prepare_statement := Prepares} = State
158
) ->
159
    MySqlFunction = mysql_function(TypeOrKey),
1,910✔
160
    {SQLOrKey2, Data} = proc_sql_params(TypeOrKey, SQLOrKey, Params, State),
1,910✔
161
    case on_sql_query(InstId, MySqlFunction, SQLOrKey2, Data, Timeout, State) of
1,910✔
162
        {error, not_prepared} ->
163
            case maybe_prepare_sql(SQLOrKey2, Prepares, PoolName) of
8✔
164
                ok ->
165
                    ?tp(
4✔
166
                        mysql_connector_on_query_prepared_sql,
167
                        #{type_or_key => TypeOrKey, sql_or_key => SQLOrKey, params => Params}
168
                    ),
169
                    %% not return result, next loop will try again
170
                    on_query(InstId, {TypeOrKey, SQLOrKey, Params, Timeout}, State);
4✔
171
                {error, Reason} ->
172
                    ?tp(
4✔
173
                        error,
174
                        "mysql_connector_do_prepare_failed",
175
                        #{
176
                            connector => InstId,
177
                            sql => SQLOrKey,
178
                            state => State,
179
                            reason => Reason
180
                        }
181
                    ),
182
                    {error, Reason}
4✔
183
            end;
184
        Result ->
185
            Result
1,890✔
186
    end.
187

188
on_batch_query(
189
    InstId,
190
    BatchReq,
191
    #{batch_inserts := Inserts, batch_params_tokens := ParamsTokens} = State
192
) ->
193
    case hd(BatchReq) of
773✔
194
        {Key, _} ->
195
            case maps:get(Key, Inserts, undefined) of
765✔
196
                undefined ->
197
                    {error, {unrecoverable_error, batch_select_not_implemented}};
4✔
198
                InsertSQL ->
199
                    Tokens = maps:get(Key, ParamsTokens),
761✔
200
                    on_batch_insert(InstId, BatchReq, InsertSQL, Tokens, State)
761✔
201
            end;
202
        Request ->
203
            LogMeta = #{connector => InstId, first_request => Request, state => State},
8✔
204
            ?SLOG(error, LogMeta#{msg => "invalid request"}),
8✔
205
            {error, {unrecoverable_error, invalid_request}}
8✔
206
    end.
207

208
mysql_function(sql) ->
209
    query;
74✔
210
mysql_function(prepared_query) ->
211
    execute;
1,836✔
212
%% for bridge
213
mysql_function(_) ->
214
    mysql_function(prepared_query).
1,774✔
215

216
on_get_status(_InstId, #{pool_name := PoolName} = State) ->
217
    case emqx_resource_pool:health_check_workers(PoolName, fun ?MODULE:do_get_status/1) of
191✔
218
        true ->
219
            case do_check_prepares(State) of
177✔
220
                ok ->
221
                    connected;
157✔
222
                {ok, NState} ->
223
                    %% return new state with prepared statements
224
                    {connected, NState};
×
225
                {error, {undefined_table, NState}} ->
226
                    {disconnected, NState, unhealthy_target};
19✔
227
                {error, _Reason} ->
228
                    %% do not log error, it is logged in prepare_sql_to_conn
229
                    connecting
1✔
230
            end;
231
        false ->
232
            connecting
12✔
233
    end.
234

235
do_get_status(Conn) ->
236
    ok == element(1, mysql:query(Conn, <<"SELECT count(1) AS T">>)).
753✔
237

238
do_check_prepares(
239
    #{
240
        pool_name := PoolName,
241
        prepare_statement := #{send_message := SQL}
242
    } = State
243
) ->
244
    % it's already connected. Verify if target table still exists
245
    Workers = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)],
128✔
246
    lists:foldl(
128✔
247
        fun
248
            (WorkerPid, ok) ->
249
                case ecpool_worker:client(WorkerPid) of
512✔
250
                    {ok, Conn} ->
251
                        case mysql:prepare(Conn, get_status, SQL) of
512✔
252
                            {error, {1146, _, _}} ->
253
                                {error, {undefined_table, State}};
×
254
                            {ok, Statement} ->
255
                                mysql:unprepare(Conn, Statement);
512✔
256
                            _ ->
257
                                ok
×
258
                        end;
259
                    _ ->
260
                        ok
×
261
                end;
262
            (_, Acc) ->
263
                Acc
×
264
        end,
265
        ok,
266
        Workers
267
    );
268
do_check_prepares(#{prepare_statement := Statement}) when is_map(Statement) ->
269
    ok;
29✔
270
do_check_prepares(State = #{pool_name := PoolName, prepare_statement := {error, Prepares}}) ->
271
    %% retry to prepare
272
    case prepare_sql(Prepares, PoolName) of
20✔
273
        ok ->
274
            %% remove the error
275
            {ok, State#{prepare_statement => Prepares}};
×
276
        {error, undefined_table} ->
277
            %% indicate the error
278
            {error, {undefined_table, State#{prepare_statement => {error, Prepares}}}};
19✔
279
        {error, Reason} ->
280
            {error, Reason}
1✔
281
    end.
282

283
%% ===================================================================
284

285
connect(Options) ->
286
    mysql:start_link(Options).
639✔
287

288
init_prepare(State = #{prepare_statement := Prepares, pool_name := PoolName}) ->
289
    case maps:size(Prepares) of
143✔
290
        0 ->
291
            State;
4✔
292
        _ ->
293
            case prepare_sql(Prepares, PoolName) of
139✔
294
                ok ->
295
                    State;
127✔
296
                {error, Reason} ->
297
                    LogMeta = #{msg => <<"mysql_init_prepare_statement_failed">>, reason => Reason},
12✔
298
                    ?SLOG(error, LogMeta),
12✔
299
                    %% mark the prepare_statement as failed
300
                    State#{prepare_statement => {error, Prepares}}
12✔
301
            end
302
    end.
303

304
maybe_prepare_sql(SQLOrKey, Prepares, PoolName) ->
305
    case maps:is_key(SQLOrKey, Prepares) of
8✔
306
        true -> prepare_sql(Prepares, PoolName);
4✔
307
        false -> {error, {unrecoverable_error, prepared_statement_invalid}}
4✔
308
    end.
309

310
prepare_sql(Prepares, PoolName) when is_map(Prepares) ->
311
    prepare_sql(maps:to_list(Prepares), PoolName);
163✔
312
prepare_sql(Prepares, PoolName) ->
313
    case do_prepare_sql(Prepares, PoolName) of
163✔
314
        ok ->
315
            %% prepare for reconnect
316
            ecpool:add_reconnect_callback(PoolName, {?MODULE, prepare_sql_to_conn, [Prepares]}),
131✔
317
            ok;
131✔
318
        {error, R} ->
319
            {error, R}
32✔
320
    end.
321

322
do_prepare_sql(Prepares, PoolName) ->
323
    Conns =
163✔
324
        [
325
            begin
326
                {ok, Conn} = ecpool_worker:client(Worker),
685✔
327
                Conn
685✔
328
            end
329
         || {_Name, Worker} <- ecpool:workers(PoolName)
163✔
330
        ],
331
    prepare_sql_to_conn_list(Conns, Prepares).
163✔
332

333
prepare_sql_to_conn_list([], _PrepareList) ->
334
    ok;
131✔
335
prepare_sql_to_conn_list([Conn | ConnList], PrepareList) ->
336
    case prepare_sql_to_conn(Conn, PrepareList) of
571✔
337
        ok ->
338
            prepare_sql_to_conn_list(ConnList, PrepareList);
539✔
339
        {error, R} ->
340
            %% rollback
341
            Fun = fun({Key, _}) ->
32✔
342
                _ = unprepare_sql_to_conn(Conn, Key),
32✔
343
                ok
32✔
344
            end,
345
            lists:foreach(Fun, PrepareList),
32✔
346
            {error, R}
32✔
347
    end.
348

349
prepare_sql_to_conn(Conn, []) when is_pid(Conn) -> ok;
539✔
350
prepare_sql_to_conn(Conn, [{Key, SQL} | PrepareList]) when is_pid(Conn) ->
351
    LogMeta = #{msg => "mysql_prepare_statement", name => Key, prepare_sql => SQL},
571✔
352
    ?SLOG(info, LogMeta),
571✔
353
    _ = unprepare_sql_to_conn(Conn, Key),
571✔
354
    case mysql:prepare(Conn, Key, SQL) of
571✔
355
        {ok, _Key} ->
356
            ?SLOG(info, LogMeta#{result => success}),
539✔
357
            prepare_sql_to_conn(Conn, PrepareList);
539✔
358
        {error, {1146, _, _} = Reason} ->
359
            %% Target table is not created
360
            ?tp(mysql_undefined_table, #{}),
30✔
361
            ?SLOG(error, LogMeta#{result => failed, reason => Reason}),
30✔
362
            {error, undefined_table};
30✔
363
        {error, Reason} ->
364
            % FIXME: we should try to differ on transient failures and
365
            % syntax failures. Retrying syntax failures is not very productive.
366
            ?SLOG(error, LogMeta#{result => failed, reason => Reason}),
2✔
367
            {error, Reason}
2✔
368
    end.
369

370
unprepare_sql_to_conn(Conn, PrepareSqlKey) ->
371
    mysql:unprepare(Conn, PrepareSqlKey).
603✔
372

373
parse_prepare_sql(Config) ->
374
    SQL =
171✔
375
        case maps:get(prepare_statement, Config, undefined) of
376
            undefined ->
377
                case maps:get(sql, Config, undefined) of
124✔
378
                    undefined -> #{};
4✔
379
                    Template -> #{send_message => Template}
120✔
380
                end;
381
            Any ->
382
                Any
47✔
383
        end,
384
    parse_prepare_sql(maps:to_list(SQL), #{}, #{}, #{}, #{}).
171✔
385

386
parse_prepare_sql([{Key, H} | _] = L, Prepares, Tokens, BatchInserts, BatchTks) ->
387
    {PrepareSQL, ParamsTokens} = emqx_placeholder:preproc_sql(H),
167✔
388
    parse_batch_prepare_sql(
167✔
389
        L, Prepares#{Key => PrepareSQL}, Tokens#{Key => ParamsTokens}, BatchInserts, BatchTks
390
    );
391
parse_prepare_sql([], Prepares, Tokens, BatchInserts, BatchTks) ->
392
    #{
171✔
393
        prepare_statement => Prepares,
394
        params_tokens => Tokens,
395
        batch_inserts => BatchInserts,
396
        batch_params_tokens => BatchTks
397
    }.
398

399
parse_batch_prepare_sql([{Key, H} | T], Prepares, Tokens, BatchInserts, BatchTks) ->
400
    case emqx_utils_sql:get_statement_type(H) of
167✔
401
        select ->
402
            parse_prepare_sql(T, Prepares, Tokens, BatchInserts, BatchTks);
47✔
403
        insert ->
404
            case emqx_utils_sql:parse_insert(H) of
120✔
405
                {ok, {InsertSQL, Params}} ->
406
                    ParamsTks = emqx_placeholder:preproc_tmpl(Params),
120✔
407
                    parse_prepare_sql(
120✔
408
                        T,
409
                        Prepares,
410
                        Tokens,
411
                        BatchInserts#{Key => InsertSQL},
412
                        BatchTks#{Key => ParamsTks}
413
                    );
414
                {error, Reason} ->
415
                    ?SLOG(error, #{msg => "split_sql_failed", sql => H, reason => Reason}),
×
416
                    parse_prepare_sql(T, Prepares, Tokens, BatchInserts, BatchTks)
×
417
            end;
418
        Type when is_atom(Type) ->
419
            ?SLOG(error, #{msg => "detect_sql_type_unsupported", sql => H, type => Type}),
×
420
            parse_prepare_sql(T, Prepares, Tokens, BatchInserts, BatchTks);
×
421
        {error, Reason} ->
422
            ?SLOG(error, #{msg => "detect_sql_type_failed", sql => H, reason => Reason}),
×
423
            parse_prepare_sql(T, Prepares, Tokens, BatchInserts, BatchTks)
×
424
    end.
425

426
proc_sql_params(query, SQLOrKey, Params, _State) ->
427
    {SQLOrKey, Params};
×
428
proc_sql_params(prepared_query, SQLOrKey, Params, _State) ->
429
    {SQLOrKey, Params};
62✔
430
proc_sql_params(TypeOrKey, SQLOrData, Params, #{params_tokens := ParamsTokens}) ->
431
    case maps:get(TypeOrKey, ParamsTokens, undefined) of
1,848✔
432
        undefined ->
433
            {SQLOrData, Params};
74✔
434
        Tokens ->
435
            {TypeOrKey, emqx_placeholder:proc_sql(Tokens, SQLOrData)}
1,774✔
436
    end.
437

438
on_batch_insert(InstId, BatchReqs, InsertPart, Tokens, State) ->
439
    ValuesPart = lists:join($,, [
761✔
440
        emqx_placeholder:proc_param_str(Tokens, Msg, fun emqx_placeholder:quote_mysql/1)
3,228✔
441
     || {_, Msg} <- BatchReqs
761✔
442
    ]),
443
    Query = [InsertPart, <<" values ">> | ValuesPart],
761✔
444
    on_sql_query(InstId, query, Query, no_params, default_timeout, State).
761✔
445

446
on_sql_query(
447
    InstId,
448
    SQLFunc,
449
    SQLOrKey,
450
    Params,
451
    Timeout,
452
    #{pool_name := PoolName} = State
453
) ->
454
    LogMeta = #{connector => InstId, sql => SQLOrKey, state => State},
2,671✔
455
    ?TRACE("QUERY", "mysql_connector_received", LogMeta),
2,671✔
456
    Worker = ecpool:get_client(PoolName),
2,671✔
457
    case ecpool_worker:client(Worker) of
2,671✔
458
        {ok, Conn} ->
459
            ?tp(
2,655✔
460
                mysql_connector_send_query,
461
                #{sql_func => SQLFunc, sql_or_key => SQLOrKey, data => Params}
462
            ),
463
            do_sql_query(SQLFunc, Conn, SQLOrKey, Params, Timeout, LogMeta);
2,655✔
464
        {error, disconnected} ->
465
            ?tp(
16✔
466
                error,
467
                "mysql_connector_do_sql_query_failed",
468
                LogMeta#{reason => worker_is_disconnected}
469
            ),
470
            {error, {recoverable_error, disconnected}}
16✔
471
    end.
472

473
do_sql_query(SQLFunc, Conn, SQLOrKey, Params, Timeout, LogMeta) ->
474
    try mysql:SQLFunc(Conn, SQLOrKey, Params, no_filtermap_fun, Timeout) of
2,655✔
475
        {error, disconnected} ->
476
            ?SLOG(
×
477
                error,
×
478
                LogMeta#{msg => "mysql_connector_do_sql_query_failed", reason => disconnected}
×
479
            ),
480
            %% kill the pool worker to trigger reconnection
481
            _ = exit(Conn, restart),
×
482
            {error, {recoverable_error, disconnected}};
×
483
        {error, not_prepared} = Error ->
484
            ?tp(
8✔
485
                mysql_connector_prepare_query_failed,
486
                #{error => not_prepared}
487
            ),
488
            ?SLOG(
8✔
489
                warning,
8✔
490
                LogMeta#{msg => "mysql_connector_prepare_query_failed", reason => not_prepared}
×
491
            ),
492
            Error;
8✔
493
        {error, {1053, <<"08S01">>, Reason}} ->
494
            %% mysql sql server shutdown in progress
495
            ?SLOG(
×
496
                error,
×
497
                LogMeta#{msg => "mysql_connector_do_sql_query_failed", reason => Reason}
×
498
            ),
499
            {error, {recoverable_error, Reason}};
×
500
        {error, Reason} ->
501
            ?SLOG(
17✔
502
                error,
17✔
503
                LogMeta#{msg => "mysql_connector_do_sql_query_failed", reason => Reason}
×
504
            ),
505
            {error, {unrecoverable_error, Reason}};
17✔
506
        Result ->
507
            ?tp(
2,613✔
508
                mysql_connector_query_return,
509
                #{result => Result}
510
            ),
511
            Result
2,613✔
512
    catch
513
        error:badarg ->
514
            ?SLOG(
4✔
515
                error,
4✔
516
                LogMeta#{msg => "mysql_connector_invalid_params", params => Params}
×
517
            ),
518
            {error, {unrecoverable_error, {invalid_params, Params}}}
4✔
519
    end.
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc