• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

emqx / emqx / 12235783303

09 Dec 2024 12:36PM UTC coverage: 82.037%. First build
12235783303

Pull #14362

github

web-flow
Merge 4819ded51 into 83154d24b
Pull Request #14362: refactor(resource): forbid changing resource state from `on_get_status` return

62 of 82 new or added lines in 27 files covered. (75.61%)

56457 of 68819 relevant lines covered (82.04%)

15149.4 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.54
/apps/emqx_bridge_mysql/src/emqx_bridge_mysql_connector.erl
1
%%--------------------------------------------------------------------
2
%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
3
%%--------------------------------------------------------------------
4
-module(emqx_bridge_mysql_connector).
5

6
-behaviour(emqx_resource).
7

8
-include_lib("emqx_resource/include/emqx_resource.hrl").
9
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
10

11
%% `emqx_resource' API
12
-export([
13
    on_remove_channel/3,
14
    resource_type/0,
15
    callback_mode/0,
16
    on_add_channel/4,
17
    on_batch_query/3,
18
    on_get_channel_status/3,
19
    on_get_channels/1,
20
    on_get_status/2,
21
    on_query/3,
22
    on_start/2,
23
    on_stop/2
24
]).
25

26
%%========================================================================================
27
%% `emqx_resource' API
28
%%========================================================================================
29
resource_type() -> emqx_mysql:resource_type().
158✔
30

31
callback_mode() -> emqx_mysql:callback_mode().
158✔
32

33
on_add_channel(
34
    _InstanceId,
35
    #{channels := Channels, connector_state := ConnectorState} = State0,
36
    ChannelId,
37
    ChannelConfig0
38
) ->
39
    ChannelConfig1 = emqx_utils_maps:unindent(parameters, ChannelConfig0),
153✔
40
    QueryTemplates = emqx_mysql:parse_prepare_sql(ChannelId, ChannelConfig1),
153✔
41
    case validate_sql_type(ChannelId, ChannelConfig1, QueryTemplates) of
153✔
42
        ok ->
43
            ChannelConfig2 = maps:merge(ChannelConfig1, QueryTemplates),
145✔
44
            ChannelConfig = set_prepares(ChannelConfig2, ConnectorState),
145✔
45
            case maps:get(prepares, ChannelConfig) of
145✔
46
                {error, {Code, ErrState, Msg}} ->
47
                    Context = #{
9✔
48
                        code => Code,
49
                        state => ErrState,
50
                        message => Msg
51
                    },
52
                    {error, {prepare_statement, Context}};
9✔
53
                {error, undefined_table} ->
54
                    {error, {unhealthy_target, <<"Undefined table">>}};
8✔
55
                ok ->
56
                    State = State0#{
128✔
57
                        channels => maps:put(ChannelId, ChannelConfig, Channels),
58
                        connector_state => ConnectorState
59
                    },
60
                    {ok, State}
128✔
61
            end;
62
        {error, Error} ->
63
            {error, Error}
8✔
64
    end.
65

66
on_get_channel_status(_InstanceId, ChannelId, #{channels := Channels}) ->
67
    case maps:get(ChannelId, Channels) of
154✔
68
        #{prepares := ok} ->
69
            ?status_connected;
154✔
70
        #{prepares := {error, _}} ->
NEW
71
            ?status_connecting
×
72
    end.
73

74
on_get_channels(InstanceId) ->
75
    emqx_bridge_v2:get_channels_for_connector(InstanceId).
676✔
76

77
on_get_status(InstanceId, #{connector_state := ConnectorState}) ->
78
    emqx_mysql:on_get_status(InstanceId, ConnectorState).
351✔
79

80
on_query(InstId, {TypeOrKey, SQLOrKey}, State) ->
81
    on_query(InstId, {TypeOrKey, SQLOrKey, [], default_timeout}, State);
1,735✔
82
on_query(InstId, {TypeOrKey, SQLOrKey, Params}, State) ->
83
    on_query(InstId, {TypeOrKey, SQLOrKey, Params, default_timeout}, State);
8✔
84
on_query(
85
    InstanceId,
86
    {Channel, _Message, _Params, _Timeout} = Request,
87
    #{channels := Channels, connector_state := ConnectorState}
88
) when is_binary(Channel) ->
89
    ChannelConfig = maps:get(Channel, Channels),
1,747✔
90
    MergedState0 = maps:merge(ConnectorState, ChannelConfig),
1,739✔
91
    MergedState1 = MergedState0#{channel_id => Channel},
1,739✔
92
    Result = emqx_mysql:on_query(
1,739✔
93
        InstanceId,
94
        Request,
95
        MergedState1
96
    ),
97
    ?tp(mysql_connector_on_query_return, #{instance_id => InstanceId, result => Result}),
1,730✔
98
    Result;
1,730✔
99
on_query(InstanceId, Request, _State = #{channels := _Channels, connector_state := ConnectorState}) ->
100
    emqx_mysql:on_query(InstanceId, Request, ConnectorState).
12✔
101

102
on_batch_query(
103
    InstanceId,
104
    [Req | _] = BatchRequest,
105
    #{channels := Channels, connector_state := ConnectorState}
106
) when is_binary(element(1, Req)) ->
107
    Channel = element(1, Req),
742✔
108
    ChannelConfig = maps:get(Channel, Channels),
742✔
109
    MergedState0 = maps:merge(ConnectorState, ChannelConfig),
742✔
110
    MergedState1 = MergedState0#{channel_id => Channel},
742✔
111
    Result = emqx_mysql:on_batch_query(
742✔
112
        InstanceId,
113
        BatchRequest,
114
        MergedState1,
115
        ChannelConfig
116
    ),
117
    ?tp(mysql_connector_on_batch_query_return, #{instance_id => InstanceId, result => Result}),
742✔
118
    Result;
742✔
119
on_batch_query(InstanceId, BatchRequest, _State = #{connector_state := ConnectorState}) ->
120
    emqx_mysql:on_batch_query(InstanceId, BatchRequest, ConnectorState, #{}).
12✔
121

122
on_remove_channel(
123
    _InstanceId, #{channels := Channels, connector_state := ConnectorState} = State, ChannelId
124
) when is_map_key(ChannelId, Channels) ->
125
    ChannelConfig = maps:get(ChannelId, Channels),
127✔
126
    emqx_mysql:unprepare_sql(ChannelId, maps:merge(ChannelConfig, ConnectorState)),
127✔
127
    NewState = State#{channels => maps:remove(ChannelId, Channels)},
127✔
128
    {ok, NewState};
127✔
129
on_remove_channel(_InstanceId, State, _ChannelId) ->
130
    {ok, State}.
8✔
131

132
-spec on_start(binary(), hocon:config()) ->
133
    {ok, #{connector_state := emqx_mysql:state(), channels := map()}} | {error, _}.
134
on_start(InstanceId, Config) ->
135
    case emqx_mysql:on_start(InstanceId, Config) of
158✔
136
        {ok, ConnectorState} ->
137
            State = #{
150✔
138
                connector_state => ConnectorState,
139
                channels => #{}
140
            },
141
            {ok, State};
150✔
142
        {error, Reason} ->
143
            {error, Reason}
8✔
144
    end.
145

146
on_stop(InstanceId, _State = #{connector_state := ConnectorState}) ->
147
    ok = emqx_mysql:on_stop(InstanceId, ConnectorState),
149✔
148
    ?tp(mysql_connector_stopped, #{instance_id => InstanceId}),
149✔
149
    ok.
149✔
150

151
%%========================================================================================
152
%% Helper fns
153
%%========================================================================================
154
set_prepares(ChannelConfig, ConnectorState) ->
155
    #{prepares := Prepares} =
145✔
156
        emqx_mysql:init_prepare(maps:merge(ConnectorState, ChannelConfig)),
157
    ChannelConfig#{prepares => Prepares}.
145✔
158

159
validate_sql_type(ChannelId, ChannelConfig, #{query_templates := QueryTemplates}) ->
160
    Batch =
153✔
161
        case emqx_utils_maps:deep_get([resource_opts, batch_size], ChannelConfig) of
162
            N when N > 1 -> batch;
64✔
163
            _ -> single
89✔
164
        end,
165
    BatchKey = {ChannelId, batch},
153✔
166
    SingleKey = {ChannelId, prepstmt},
153✔
167
    case {QueryTemplates, Batch} of
153✔
168
        {#{BatchKey := _}, batch} ->
169
            ok;
56✔
170
        {#{SingleKey := _}, single} ->
171
            ok;
89✔
172
        {_, batch} ->
173
            %% try to provide helpful info
174
            SQL = maps:get(sql, ChannelConfig),
8✔
175
            Type = emqx_utils_sql:get_statement_type(SQL),
8✔
176
            ErrorContext0 = #{
8✔
177
                reason => failed_to_prepare_statement,
178
                statement_type => Type,
179
                operation_type => Batch
180
            },
181
            ErrorContext = emqx_utils_maps:put_if(
8✔
182
                ErrorContext0,
183
                hint,
184
                <<"UPDATE statements are not supported for batch operations">>,
185
                Type =:= update
186
            ),
187
            {error, ErrorContext};
8✔
188
        _ ->
189
            SQL = maps:get(sql, ChannelConfig),
×
190
            Type = emqx_utils_sql:get_statement_type(SQL),
×
191
            ErrorContext = #{
×
192
                reason => failed_to_prepare_statement,
193
                statement_type => Type,
194
                operation_type => Batch
195
            },
196
            {error, ErrorContext}
×
197
    end.
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc