• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

processone / ejabberd / 444

pending completion
444

push

github

Paweł Chmielowski
Fix typo in last commit

13452 of 40447 relevant lines covered (33.26%)

673.94 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

63.81
/src/mod_proxy65_stream.erl
1
%%%----------------------------------------------------------------------
2
%%% File    : mod_proxy65_stream.erl
3
%%% Author  : Evgeniy Khramtsov <xram@jabber.ru>
4
%%% Purpose : Bytestream process.
5
%%% Created : 12 Oct 2006 by Evgeniy Khramtsov <xram@jabber.ru>
6
%%%
7
%%% ejabberd, Copyright (C) 2002-2022   ProcessOne
8
%%%
9
%%% This program is free software; you can redistribute it and/or
10
%%% modify it under the terms of the GNU General Public License as
11
%%% published by the Free Software Foundation; either version 2 of the
12
%%% License, or (at your option) any later version.
13
%%%
14
%%% This program is distributed in the hope that it will be useful,
15
%%% but WITHOUT ANY WARRANTY; without even the implied warranty of
16
%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17
%%% General Public License for more details.
18
%%%
19
%%% You should have received a copy of the GNU General Public License along
20
%%% with this program; if not, write to the Free Software Foundation, Inc.,
21
%%% 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
22
%%%
23
%%%----------------------------------------------------------------------
24

25
-module(mod_proxy65_stream).
26

27
-author('xram@jabber.ru').
28

29
-behaviour(p1_fsm).
30
-behaviour(ejabberd_listener).
31

32
%% gen_fsm callbacks.
33
-export([init/1, handle_event/3, handle_sync_event/4,
34
         code_change/4, handle_info/3, terminate/3]).
35

36
%% gen_fsm states.
37
-export([accepting/2, wait_for_init/2, wait_for_auth/2,
38
         wait_for_request/2, wait_for_activation/2,
39
         stream_established/2]).
40

41
-export([start/3, stop/1, start_link/3, activate/2,
42
         relay/3, accept/1, listen_options/0]).
43

44
-include("mod_proxy65.hrl").
45

46
-include("logger.hrl").
47

48
-define(WAIT_TIMEOUT, 60000).
49

50
-record(state,
51
        {socket :: inet:socket(),
52
         timer = make_ref() :: reference(),
53
         sha1 = <<"">> :: binary(),
54
         host = <<"">> :: binary(),
55
         auth_type = anonymous :: plain | anonymous,
56
         shaper = none :: ejabberd_shaper:shaper()}).
57

58
%% Unused callbacks
59
handle_event(_Event, StateName, StateData) ->
60
    {next_state, StateName, StateData}.
×
61

62
code_change(_OldVsn, StateName, StateData, _Extra) ->
63
    {ok, StateName, StateData}.
×
64

65
%%-------------------------------
66

67
start(gen_tcp, Socket, Opts) ->
68
    Host = proplists:get_value(server_host, Opts),
×
69
    p1_fsm:start(?MODULE, [Socket, Host], []).
×
70

71
start_link(gen_tcp, Socket, Opts) ->
72
    Host = proplists:get_value(server_host, Opts),
2✔
73
    p1_fsm:start_link(?MODULE, [Socket, Host], []).
2✔
74

75
init([Socket, Host]) ->
76
    process_flag(trap_exit, true),
2✔
77
    AuthType = mod_proxy65_opt:auth_type(Host),
2✔
78
    Shaper = mod_proxy65_opt:shaper(Host),
2✔
79
    RecvBuf = mod_proxy65_opt:recbuf(Host),
2✔
80
    SendBuf = mod_proxy65_opt:sndbuf(Host),
2✔
81
    TRef = erlang:send_after(?WAIT_TIMEOUT, self(), stop),
2✔
82
    inet:setopts(Socket, [{recbuf, RecvBuf}, {sndbuf, SendBuf}]),
2✔
83
    {ok, accepting,
2✔
84
     #state{host = Host, auth_type = AuthType,
85
            socket = Socket, shaper = Shaper, timer = TRef}}.
86

87
terminate(_Reason, StateName, #state{sha1 = SHA1}) ->
88
    Mod = gen_mod:ram_db_mod(global, mod_proxy65),
2✔
89
    Mod:unregister_stream(SHA1),
2✔
90
    if StateName == stream_established ->
2✔
91
           ?INFO_MSG("(~w) Bytestream terminated", [self()]);
2✔
92
       true -> ok
×
93
    end.
94

95
%%%------------------------------
96
%%% API.
97
%%%------------------------------
98
accept(StreamPid) ->
99
    p1_fsm:send_event(StreamPid, accept).
2✔
100

101
stop(StreamPid) -> StreamPid ! stop.
×
102

103
activate({P1, J1}, {P2, J2}) ->
104
    case catch {p1_fsm:sync_send_all_state_event(P1,
1✔
105
                                                  get_socket),
106
                p1_fsm:sync_send_all_state_event(P2, get_socket)}
107
        of
108
      {S1, S2} when is_port(S1), is_port(S2) ->
109
          P1 ! {activate, P2, S2, J1, J2},
1✔
110
          P2 ! {activate, P1, S1, J1, J2},
1✔
111
          JID1 = jid:encode(J1),
1✔
112
          JID2 = jid:encode(J2),
1✔
113
          ?INFO_MSG("(~w:~w) Activated bytestream for ~ts "
1✔
114
                    "-> ~ts",
115
                    [P1, P2, JID1, JID2]),
1✔
116
          ok;
1✔
117
      _ -> error
×
118
    end.
119

120
%%%-----------------------
121
%%% States
122
%%%-----------------------
123
accepting(accept, State) ->
124
    inet:setopts(State#state.socket, [{active, true}]),
2✔
125
    {next_state, wait_for_init, State}.
2✔
126

127
wait_for_init(Packet,
128
              #state{socket = Socket, auth_type = AuthType} =
129
                  StateData) ->
130
    case mod_proxy65_lib:unpack_init_message(Packet) of
2✔
131
      {ok, AuthMethods} ->
132
          Method = select_auth_method(AuthType, AuthMethods),
2✔
133
          gen_tcp:send(Socket,
2✔
134
                       mod_proxy65_lib:make_init_reply(Method)),
135
          case Method of
2✔
136
            ?AUTH_ANONYMOUS ->
137
                {next_state, wait_for_request, StateData};
2✔
138
            ?AUTH_PLAIN -> {next_state, wait_for_auth, StateData};
×
139
            ?AUTH_NO_METHODS -> {stop, normal, StateData}
×
140
          end;
141
      error -> {stop, normal, StateData}
×
142
    end.
143

144
wait_for_auth(Packet,
145
              #state{socket = Socket, host = Host} = StateData) ->
146
    case mod_proxy65_lib:unpack_auth_request(Packet) of
×
147
      {User, Pass} ->
148
          Result = ejabberd_auth:check_password(User, <<"">>, Host, Pass),
×
149
          gen_tcp:send(Socket,
×
150
                       mod_proxy65_lib:make_auth_reply(Result)),
151
          case Result of
×
152
            true -> {next_state, wait_for_request, StateData};
×
153
            false -> {stop, normal, StateData}
×
154
          end;
155
      _ -> {stop, normal, StateData}
×
156
    end.
157

158
wait_for_request(Packet,
159
                 #state{socket = Socket} = StateData) ->
160
    Request = mod_proxy65_lib:unpack_request(Packet),
2✔
161
    case Request of
2✔
162
      #s5_request{sha1 = SHA1, cmd = connect} ->
163
          Mod = gen_mod:ram_db_mod(global, mod_proxy65),
2✔
164
          case Mod:register_stream(SHA1, self()) of
2✔
165
            ok ->
166
                inet:setopts(Socket, [{active, false}]),
2✔
167
                gen_tcp:send(Socket,
2✔
168
                             mod_proxy65_lib:make_reply(Request)),
169
                {next_state, wait_for_activation,
2✔
170
                 StateData#state{sha1 = SHA1}};
171
            _ ->
172
                Err = mod_proxy65_lib:make_error_reply(Request),
×
173
                gen_tcp:send(Socket, Err),
×
174
                {stop, normal, StateData}
×
175
          end;
176
      #s5_request{cmd = udp} ->
177
          Err = mod_proxy65_lib:make_error_reply(Request,
×
178
                                                 ?ERR_COMMAND_NOT_SUPPORTED),
179
          gen_tcp:send(Socket, Err),
×
180
          {stop, normal, StateData};
×
181
      _ -> {stop, normal, StateData}
×
182
    end.
183

184
wait_for_activation(_Data, StateData) ->
185
    {next_state, wait_for_activation, StateData}.
×
186

187
stream_established(_Data, StateData) ->
188
    {next_state, stream_established, StateData}.
×
189

190
%%%-----------------------
191
%%% Callbacks processing
192
%%%-----------------------
193

194
%% SOCKS5 packets.
195
handle_info({tcp, _S, Data}, StateName, StateData)
196
    when StateName /= wait_for_activation ->
197
    misc:cancel_timer(StateData#state.timer),
4✔
198
    TRef = erlang:send_after(?WAIT_TIMEOUT, self(), stop),
4✔
199
    p1_fsm:send_event(self(), Data),
4✔
200
    {next_state, StateName, StateData#state{timer = TRef}};
4✔
201
%% Activation message.
202
handle_info({activate, PeerPid, PeerSocket, IJid, TJid},
203
            wait_for_activation, StateData) ->
204
    erlang:monitor(process, PeerPid),
2✔
205
    misc:cancel_timer(StateData#state.timer),
2✔
206
    MySocket = StateData#state.socket,
2✔
207
    Shaper = StateData#state.shaper,
2✔
208
    Host = StateData#state.host,
2✔
209
    MaxRate = find_maxrate(Shaper, IJid, TJid, Host),
2✔
210
    spawn_link(?MODULE, relay,
2✔
211
               [MySocket, PeerSocket, MaxRate]),
212
    {next_state, stream_established, StateData};
2✔
213
%% Socket closed
214
handle_info({tcp_closed, _Socket}, _StateName,
215
            StateData) ->
216
    {stop, normal, StateData};
×
217
handle_info({tcp_error, _Socket, _Reason}, _StateName,
218
            StateData) ->
219
    {stop, normal, StateData};
×
220
%% Got stop message.
221
handle_info(stop, _StateName, StateData) ->
222
    {stop, normal, StateData};
×
223
%% Either linked process or peer process died.
224
handle_info({'EXIT', _, _}, _StateName, StateData) ->
225
    {stop, normal, StateData};
2✔
226
handle_info({'DOWN', _, _, _, _}, _StateName,
227
            StateData) ->
228
    {stop, normal, StateData};
×
229
%% Packets of no interest
230
handle_info(_Info, StateName, StateData) ->
231
    {next_state, StateName, StateData}.
×
232

233
%% Socket request.
234
handle_sync_event(get_socket, _From,
235
                  wait_for_activation, StateData) ->
236
    Socket = StateData#state.socket,
2✔
237
    {reply, Socket, wait_for_activation, StateData};
2✔
238
handle_sync_event(_Event, _From, StateName,
239
                  StateData) ->
240
    {reply, error, StateName, StateData}.
×
241

242
%%%-------------------------------------------------
243
%%% Relay Process.
244
%%%-------------------------------------------------
245
relay(MySocket, PeerSocket, Shaper) ->
246
    case gen_tcp:recv(MySocket, 0) of
3✔
247
        {ok, Data} ->
248
            case gen_tcp:send(PeerSocket, Data) of
1✔
249
                ok ->
250
                    {NewShaper, Pause} = ejabberd_shaper:update(Shaper, byte_size(Data)),
1✔
251
                    if Pause > 0 -> timer:sleep(Pause);
1✔
252
                       true -> pass
1✔
253
                    end,
254
                    relay(MySocket, PeerSocket, NewShaper);
1✔
255
                {error, _} = Err ->
256
                    Err
×
257
            end;
258
        {error, _} = Err ->
259
            Err
2✔
260
    end.
261

262
%%%------------------------
263
%%% Auxiliary functions
264
%%%------------------------
265
select_auth_method(plain, AuthMethods) ->
266
    case lists:member(?AUTH_PLAIN, AuthMethods) of
×
267
      true -> ?AUTH_PLAIN;
×
268
      false -> ?AUTH_NO_METHODS
×
269
    end;
270
select_auth_method(anonymous, AuthMethods) ->
271
    case lists:member(?AUTH_ANONYMOUS, AuthMethods) of
2✔
272
      true -> ?AUTH_ANONYMOUS;
2✔
273
      false -> ?AUTH_NO_METHODS
×
274
    end.
275

276
%% Obviously, we must use shaper with maximum rate.
277
find_maxrate(Shaper, JID1, JID2, Host) ->
278
    R1 = ejabberd_shaper:match(Host, Shaper, JID1),
2✔
279
    R2 = ejabberd_shaper:match(Host, Shaper, JID2),
2✔
280
    R = case ejabberd_shaper:get_max_rate(R1) >= ejabberd_shaper:get_max_rate(R2) of
2✔
281
            true -> R1;
2✔
282
            false -> R2
×
283
        end,
284
    ejabberd_shaper:new(R).
2✔
285

286
listen_options() ->
287
    [].
10✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc