• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

emqx / emqx / 6861295443

14 Nov 2023 08:56AM UTC coverage: 82.708% (+0.04%) from 82.664%
6861295443

push

github

web-flow
Merge pull request #11945 from kjellwinblad/kjell/shared_con/better_names_and_specs

emqx_bridge_v2 module improvements

114 of 123 new or added lines in 13 files covered. (92.68%)

82 existing lines in 18 files now uncovered.

35552 of 42985 relevant lines covered (82.71%)

6667.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.23
/apps/emqx_ft/src/emqx_ft_assembler.erl
1
%%--------------------------------------------------------------------
2
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
3
%%
4
%% Licensed under the Apache License, Version 2.0 (the "License");
5
%% you may not use this file except in compliance with the License.
6
%% You may obtain a copy of the License at
7
%%
8
%%     http://www.apache.org/licenses/LICENSE-2.0
9
%%
10
%% Unless required by applicable law or agreed to in writing, software
11
%% distributed under the License is distributed on an "AS IS" BASIS,
12
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
%% See the License for the specific language governing permissions and
14
%% limitations under the License.
15
%%--------------------------------------------------------------------
16

17
-module(emqx_ft_assembler).
18

19
-export([start_link/4]).
20

21
-behaviour(gen_statem).
22
-export([callback_mode/0]).
23
-export([init/1]).
24
-export([handle_event/4]).
25
-export([terminate/3]).
26

27
-export([where/1]).
28

29
-type stdata() :: #{
30
    storage := emqx_ft_storage_fs:storage(),
31
    transfer := emqx_ft:transfer(),
32
    finopts := emqx_ft:finopts(),
33
    assembly := emqx_ft_assembly:t(),
34
    export => emqx_ft_storage_exporter:export()
35
}.
36

37
-define(NAME(Transfer), {n, l, {?MODULE, Transfer}}).
38
-define(REF(Transfer), {via, gproc, ?NAME(Transfer)}).
39

40
%%
41

42
start_link(Storage, Transfer, Size, Opts) ->
43
    gen_statem:start_link(?REF(Transfer), ?MODULE, {Storage, Transfer, Size, Opts}, []).
1,120✔
44

45
where(Transfer) ->
46
    gproc:where(?NAME(Transfer)).
1,147✔
47

48
%%
49

50
-type state() ::
51
    idle
52
    | list_local_fragments
53
    | {list_remote_fragments, [node()]}
54
    | start_assembling
55
    | {assemble, [{node(), emqx_ft_storage_fs:filefrag()}]}
56
    | complete.
57

58
-define(internal(C), {next_event, internal, C}).
59

60
callback_mode() ->
61
    handle_event_function.
1,120✔
62

63
-spec init(_Args) -> {ok, state(), stdata()}.
64
init({Storage, Transfer, Size, Opts}) ->
65
    _ = erlang:process_flag(trap_exit, true),
1,120✔
66
    St = #{
1,120✔
67
        storage => Storage,
68
        transfer => Transfer,
69
        finopts => Opts,
70
        assembly => emqx_ft_assembly:new(Size)
71
    },
72
    {ok, idle, St}.
1,120✔
73

74
-spec handle_event(info | internal, _, state(), stdata()) ->
75
    {next_state, state(), stdata(), {next_event, internal, _}}
76
    | {stop, {shutdown, ok | {error, _}}, stdata()}.
77
handle_event(info, kickoff, idle, St) ->
78
    % NOTE
79
    % Someone's told us to start the work, which usually means that it has set up a monitor.
80
    % We could wait for this message and handle it at the end of the assembling rather than at
81
    % the beginning, however it would make error handling much more messier.
82
    {next_state, list_local_fragments, St, ?internal([])};
1,119✔
83
handle_event(info, kickoff, _, _St) ->
84
    keep_state_and_data;
×
85
handle_event(
86
    internal,
87
    _,
88
    list_local_fragments,
89
    St = #{storage := Storage, transfer := Transfer, assembly := Asm}
90
) ->
91
    % TODO: what we do with non-transients errors here (e.g. `eacces`)?
92
    {ok, Fragments} = emqx_ft_storage_fs:list(Storage, Transfer, fragment),
1,119✔
93
    NAsm = emqx_ft_assembly:update(emqx_ft_assembly:append(Asm, node(), Fragments)),
1,119✔
94
    NSt = St#{assembly := NAsm},
1,119✔
95
    case emqx_ft_assembly:status(NAsm) of
1,119✔
96
        complete ->
97
            {next_state, start_assembling, NSt, ?internal([])};
1,104✔
98
        {incomplete, _} ->
99
            Nodes = emqx:running_nodes() -- [node()],
15✔
100
            {next_state, {list_remote_fragments, Nodes}, NSt, ?internal([])};
15✔
101
        % TODO: recovery?
102
        {error, _} = Error ->
103
            {stop, {shutdown, Error}}
×
104
    end;
105
handle_event(
106
    internal,
107
    _,
108
    {list_remote_fragments, Nodes},
109
    St = #{transfer := Transfer, assembly := Asm}
110
) ->
111
    % TODO
112
    % Async would better because we would not need to wait for some lagging nodes if
113
    % the coverage is already complete.
114
    % TODO: portable "storage" ref
115
    Results = emqx_ft_storage_fs_proto_v1:multilist(Nodes, Transfer, fragment),
15✔
116
    NodeResults = lists:zip(Nodes, Results),
15✔
117
    NAsm = emqx_ft_assembly:update(
15✔
118
        lists:foldl(
119
            fun
120
                ({Node, {ok, {ok, Fragments}}}, Acc) ->
121
                    emqx_ft_assembly:append(Acc, Node, Fragments);
22✔
122
                ({_Node, _Result}, Acc) ->
123
                    % TODO: log?
UNCOV
124
                    Acc
×
125
            end,
126
            Asm,
127
            NodeResults
128
        )
129
    ),
130
    NSt = St#{assembly := NAsm},
15✔
131
    case emqx_ft_assembly:status(NAsm) of
15✔
132
        complete ->
133
            {next_state, start_assembling, NSt, ?internal([])};
11✔
134
        % TODO: retries / recovery?
135
        {incomplete, _} = Status ->
136
            {stop, {shutdown, {error, Status}}};
4✔
137
        {error, _} = Error ->
138
            {stop, {shutdown, Error}}
×
139
    end;
140
handle_event(
141
    internal,
142
    _,
143
    start_assembling,
144
    St = #{storage := Storage, transfer := Transfer, assembly := Asm}
145
) ->
146
    Filemeta = emqx_ft_assembly:filemeta(Asm),
1,115✔
147
    Coverage = emqx_ft_assembly:coverage(Asm),
1,115✔
148
    case emqx_ft_storage_exporter:start_export(Storage, Transfer, Filemeta) of
1,115✔
149
        {ok, Export} ->
150
            {next_state, {assemble, Coverage}, St#{export => Export}, ?internal([])};
1,115✔
151
        {error, _} = Error ->
152
            {stop, {shutdown, Error}}
×
153
    end;
154
handle_event(internal, _, {assemble, [{Node, Segment} | Rest]}, St = #{export := Export}) ->
155
    % TODO
156
    % Currently, race is possible between getting segment info from the remote node and
157
    % this node garbage collecting the segment itself.
158
    % TODO: pipelining
159
    % TODO: better error handling
160
    {ok, Content} = pread(Node, Segment, St),
1,189✔
161
    case emqx_ft_storage_exporter:write(Export, Content) of
1,188✔
162
        {ok, NExport} ->
163
            {next_state, {assemble, Rest}, St#{export := NExport}, ?internal([])};
1,188✔
164
        {error, _} = Error ->
165
            {stop, {shutdown, Error}, maps:remove(export, St)}
×
166
    end;
167
handle_event(internal, _, {assemble, []}, St = #{}) ->
168
    {next_state, complete, St, ?internal([])};
1,114✔
169
handle_event(internal, _, complete, St = #{export := Export, finopts := Opts}) ->
170
    Result = emqx_ft_storage_exporter:complete(Export, Opts),
1,114✔
171
    _ = maybe_garbage_collect(Result, St),
1,114✔
172
    {stop, {shutdown, Result}, maps:remove(export, St)}.
1,114✔
173

174
-spec terminate(_Reason, state(), stdata()) -> _.
175
terminate(_Reason, _StateName, #{export := Export}) ->
176
    emqx_ft_storage_exporter:discard(Export);
1✔
177
terminate(_Reason, _StateName, #{}) ->
178
    ok.
1,119✔
179

180
pread(Node, Segment, #{storage := Storage, transfer := Transfer}) when Node =:= node() ->
181
    emqx_ft_storage_fs:pread(Storage, Transfer, Segment, 0, segsize(Segment));
1,177✔
182
pread(Node, Segment, #{transfer := Transfer}) ->
183
    emqx_ft_storage_fs_proto_v1:pread(Node, Transfer, Segment, 0, segsize(Segment)).
12✔
184

185
%%
186

187
maybe_garbage_collect(ok, #{storage := Storage, transfer := Transfer, assembly := Asm}) ->
188
    Nodes = emqx_ft_assembly:nodes(Asm),
1,112✔
189
    emqx_ft_storage_fs_gc:collect(Storage, Transfer, Nodes);
1,112✔
190
maybe_garbage_collect({error, _}, _St) ->
191
    ok.
2✔
192

193
segsize(#{fragment := {segment, Info}}) ->
194
    maps:get(size, Info).
1,189✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc