• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

emqx / emqx / 6861295443

14 Nov 2023 08:56AM UTC coverage: 82.708% (+0.04%) from 82.664%
6861295443

push

github

web-flow
Merge pull request #11945 from kjellwinblad/kjell/shared_con/better_names_and_specs

emqx_bridge_v2 module improvements

114 of 123 new or added lines in 13 files covered. (92.68%)

82 existing lines in 18 files now uncovered.

35552 of 42985 relevant lines covered (82.71%)

6667.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

64.15
/apps/emqx_gateway_exproto/src/emqx_exproto_gcli.erl
1
%%--------------------------------------------------------------------
2
%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
3
%%
4
%% Licensed under the Apache License, Version 2.0 (the "License");
5
%% you may not use this file except in compliance with the License.
6
%% You may obtain a copy of the License at
7
%%
8
%%     http://www.apache.org/licenses/LICENSE-2.0
9
%%
10
%% Unless required by applicable law or agreed to in writing, software
11
%% distributed under the License is distributed on an "AS IS" BASIS,
12
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
%% See the License for the specific language governing permissions and
14
%% limitations under the License.
15
%%--------------------------------------------------------------------
16

17
%% the gRPC client agent for ConnectionHandler service
18
-module(emqx_exproto_gcli).
19

20
-include_lib("emqx/include/logger.hrl").
21

22
-logger_header("[ExProtoGCli]").
23

24
-export([
25
    init/2,
26
    maybe_shoot/1,
27
    maybe_shoot/3,
28
    ack/2,
29
    is_empty/1
30
]).
31

32
-define(CONN_HANDLER_MOD, emqx_exproto_v_1_connection_handler_client).
33
-define(CONN_UNARY_HANDLER_MOD, emqx_exproto_v_1_connection_unary_handler_client).
34

35
-type service_name() :: 'ConnectionUnaryHandler' | 'ConnectionHandler'.
36

37
-type grpc_client_state() ::
38
    #{
39
        owner := pid(),
40
        service_name := service_name(),
41
        client_opts := options(),
42
        queue := queue:queue(),
43
        inflight := atom() | undefined,
44
        streams => map(),
45
        middleman => pid() | undefined
46
    }.
47

48
-type options() ::
49
    #{channel := term()}.
50

51
%%--------------------------------------------------------------------
52
%% APIs
53
%%--------------------------------------------------------------------
54

55
-spec init(service_name(), options()) -> grpc_client_state().
56
init(ServiceName, Options) ->
57
    #{
56✔
58
        owner => self(),
59
        service_name => ServiceName,
60
        client_opts => Options,
61
        queue => queue:new(),
62
        inflight => undefined
63
    }.
64

65
-spec maybe_shoot(atom(), map(), grpc_client_state()) -> grpc_client_state().
66
maybe_shoot(FunName, Req, GState = #{inflight := undefined}) ->
67
    shoot(FunName, Req, GState);
178✔
68
maybe_shoot(FunName, Req, GState) ->
69
    enqueue(FunName, Req, GState).
76✔
70

71
-spec maybe_shoot(grpc_client_state()) -> grpc_client_state().
72
maybe_shoot(GState = #{inflight := undefined, queue := Q}) ->
73
    case queue:is_empty(Q) of
166✔
74
        true ->
75
            GState;
91✔
76
        false ->
77
            {{value, {FunName, Req}}, Q1} = queue:out(Q),
75✔
78
            shoot(FunName, Req, GState#{queue => Q1})
75✔
79
    end.
80

81
-spec ack(atom(), grpc_client_state()) -> grpc_client_state().
82
ack(FunName, GState = #{inflight := FunName}) ->
83
    GState#{inflight => undefined, middleman => undefined};
199✔
84
ack(_, _) ->
85
    error(badarg).
×
86

87
-spec is_empty(grpc_client_state()) -> boolean().
88
is_empty(#{queue := Q, inflight := Inflight}) ->
89
    Inflight == undefined andalso queue:is_empty(Q).
263✔
90

91
%%--------------------------------------------------------------------
92
%% Internal funcs
93
%%--------------------------------------------------------------------
94

95
enqueue(FunName, Req, GState = #{queue := Q}) ->
96
    GState#{queue => queue:in({FunName, Req}, Q)}.
76✔
97

98
shoot(FunName, Req, GState) ->
99
    ServiceName = maps:get(service_name, GState),
253✔
100
    shoot(ServiceName, FunName, Req, GState).
253✔
101

102
shoot(
103
    'ConnectionUnaryHandler',
104
    FunName,
105
    Req,
106
    GState = #{owner := Owner, client_opts := Options}
107
) ->
108
    Pid =
190✔
109
        spawn(
110
            fun() ->
111
                try
190✔
112
                    Result = request(FunName, Req, Options),
190✔
113
                    hreply(Owner, Result, FunName)
190✔
114
                catch
115
                    T:R:Stk ->
UNCOV
116
                        hreply(Owner, {error, {{T, R}, Stk}}, FunName)
×
117
                end
118
            end
119
        ),
120
    GState#{inflight => FunName, middleman => Pid};
190✔
121
shoot(
122
    'ConnectionHandler',
123
    FunName,
124
    Req,
125
    GState
126
) ->
127
    GState1 = streaming(FunName, Req, GState),
63✔
128
    GState1#{inflight => FunName}.
63✔
129

130
%%--------------------------------------------------------------------
131
%% streaming
132

133
streaming(
134
    FunName,
135
    Req,
136
    GState = #{owner := Owner, client_opts := Options}
137
) ->
138
    Streams = maps:get(streams, GState, #{}),
63✔
139
    case ensure_stream_opened(FunName, Options, Streams) of
63✔
140
        {error, Reason} ->
141
            ?SLOG(error, #{
×
142
                msg => "request_grpc_server_failed",
143
                function => {FunName, Options},
144
                reason => Reason
145
            }),
×
146
            hreply(Owner, {error, Reason}, FunName),
×
147
            {ok, GState};
×
148
        {ok, Stream} ->
149
            case catch grpc_client:send(Stream, Req) of
63✔
150
                ok ->
151
                    ?SLOG(debug, #{
63✔
152
                        msg => "send_grpc_request_succeed",
153
                        function => FunName,
154
                        request => Req
155
                    }),
63✔
156
                    hreply(Owner, ok, FunName),
63✔
157
                    GState#{streams => Streams#{FunName => Stream}};
63✔
158
                {'EXIT', {not_found, _Stk}} ->
159
                    %% Not found the stream, reopen it
160
                    ?SLOG(info, #{
×
161
                        msg => "cannt_find_old_stream_ref",
162
                        function => FunName
163
                    }),
×
164
                    streaming(FunName, Req, GState#{streams => maps:remove(FunName, Streams)});
×
165
                {'EXIT', {timeout, _Stk}} ->
166
                    ?SLOG(error, #{
×
167
                        msg => "send_grpc_request_timeout",
168
                        function => FunName,
169
                        request => Req
170
                    }),
×
171
                    hreply(Owner, {error, timeout}, FunName),
×
172
                    GState;
×
173
                {'EXIT', {Reason1, Stk}} ->
174
                    ?SLOG(error, #{
×
175
                        msg => "send_grpc_request_failed",
176
                        function => FunName,
177
                        request => Req,
178
                        error => Reason1,
179
                        stacktrace => Stk
180
                    }),
×
181
                    hreply(Owner, {error, Reason1}, FunName),
×
182
                    GState
×
183
            end
184
    end.
185

186
ensure_stream_opened(FunName, Options, Streams) ->
187
    case maps:get(FunName, Streams, undefined) of
63✔
188
        undefined ->
189
            case apply(?CONN_HANDLER_MOD, FunName, [Options]) of
48✔
190
                {ok, Stream} -> {ok, Stream};
48✔
191
                {error, Reason} -> {error, Reason}
×
192
            end;
193
        Stream ->
194
            {ok, Stream}
15✔
195
    end.
196

197
%%--------------------------------------------------------------------
198
%% unary
199

200
request(FunName, Req, Options) ->
201
    case apply(?CONN_UNARY_HANDLER_MOD, FunName, [Req, Options]) of
190✔
202
        {ok, _EmptySucc, _Md} ->
203
            ok;
190✔
204
        {error, Reason} ->
205
            {error, Reason}
×
206
    end.
207

208
hreply(Owner, Result, FunName) ->
209
    Owner ! {hreply, FunName, Result},
253✔
210
    ok.
253✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc