Skip to content

Commit

Permalink
Support application hot release
Browse files Browse the repository at this point in the history
  • Loading branch information
黄鹏举 committed Aug 3, 2022
1 parent 3ba3a8a commit 70b6ac8
Show file tree
Hide file tree
Showing 4 changed files with 121 additions and 3 deletions.
25 changes: 25 additions & 0 deletions src/esockd.erl
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,13 @@
, close/1
]).

%% Support Hot Release
-export([ close_port/1
, close_port/2
, resume_port/1
, resume_port/2
]).

-export([ reopen/1
, reopen/2
]).
Expand Down Expand Up @@ -159,6 +166,24 @@ close({Proto, ListenOn}) when is_atom(Proto) ->
close(Proto, ListenOn) when is_atom(Proto) ->
esockd_sup:stop_listener(Proto, fixaddr(ListenOn)).

%% @doc just close port, don't kill esockd supervisor process made connection close
-spec(close_port({atom(), listen_on()}) -> ok | {error, term()}).
close_port({Proto, ListenOn}) when is_atom(Proto) ->
close_port(Proto, ListenOn).

-spec close_port(atom(), listen_on()) -> ok | {error, term()}.
close_port(Proto, ListenOn) when is_atom(Proto) ->
esockd_sup:close_port(Proto, fixaddr(ListenOn)).

%% @doc resume port when use close port
-spec(resume_port({atom(), listen_on()}) -> ok | {error, term()}).
resume_port({Proto, ListenOn}) when is_atom(Proto) ->
resume_port(Proto, ListenOn).

-spec resume_port(atom(), listen_on()) -> ok | {error, term()}.
resume_port(Proto, ListenOn) when is_atom(Proto) ->
esockd_sup:resume_port(Proto, fixaddr(ListenOn)).

%% @doc Reopen the listener
-spec(reopen({atom(), listen_on()}) -> {ok, pid()} | {error, term()}).
reopen({Proto, ListenOn}) when is_atom(Proto) ->
Expand Down
39 changes: 39 additions & 0 deletions src/esockd_listener.erl
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@

-export([ options/1
, get_port/1
, get_sock/1
]).

-export([ resume_sock/2
]).

%% gen_server callbacks
Expand Down Expand Up @@ -65,6 +69,16 @@ options(Listener) ->
get_port(Listener) ->
gen_server:call(Listener, get_port).

-spec(get_sock(pid()) -> inet:socket()).
get_sock(Listener) ->
gen_server:call(Listener, get_sock).

-spec(resume_sock(pid(), pid()) -> ok | {error, term()}).
resume_sock(Listener, AcceptorSup) ->
case gen_server:call(Listener, {resume_sock, AcceptorSup}) of
ok -> ok;
{error, Reason} -> {error, Reason}
end.
%%--------------------------------------------------------------------
%% gen_server callbacks
%%--------------------------------------------------------------------
Expand Down Expand Up @@ -109,6 +123,31 @@ handle_call(options, _From, State = #state{options = Opts}) ->
handle_call(get_port, _From, State = #state{lport = LPort}) ->
{reply, LPort, State};

handle_call(get_sock, _From, State = #state{lsock = LSock}) ->
{reply, LSock, State};

handle_call({resume_sock, AcceptorSup}, _From,
State = #state{proto = Proto,
listen_on = ListenOn,
options = Opts}) ->
Port = port(ListenOn),
SockOpts = merge_addr(ListenOn, sockopts(Opts)),
case esockd_transport:listen(Port, [{active, false} | proplists:delete(active, SockOpts)]) of
{ok, LSock} ->
AcceptorNum = proplists:get_value(acceptors, Opts, ?ACCEPTOR_POOL),
lists:foreach(fun (_) ->
{ok, _APid} = esockd_acceptor_sup:start_acceptor(AcceptorSup, LSock)
end, lists:seq(1, AcceptorNum)),
{ok, {LAddr, LPort}} = inet:sockname(LSock),
%%error_logger:info_msg("~s listen on ~s:~p with ~p acceptors.~n",
%% [Proto, inet:ntoa(LAddr), LPort, AcceptorNum]),
{reply, ok, #state{lsock = LSock, laddr = LAddr, lport = LPort}};
{error, Reason} ->
error_logger:error_msg("~s failed to resume port ~p - ~p (~s)",
[Proto, Port, Reason, inet:format_error(Reason)]),
{reply, Reason, State}
end;

handle_call(Req, _From, State) ->
error_logger:error_msg("[~s] Unexpected call: ~p", [?MODULE, Req]),
{noreply, State}.
Expand Down
42 changes: 39 additions & 3 deletions src/esockd_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@
, restart_listener/2
]).

-export([ close_port/2
, resume_port/2
]).

-export([ listeners/0
, listener/1
, listener_and_module/1
Expand Down Expand Up @@ -93,7 +97,23 @@ stop_listener(Proto, ListenOn) ->
case match_listeners(Proto, ListenOn) of
[] -> {error, not_found};
Listeners ->
return_ok_or_error([terminate_and_delete(ChildId) || ChildId <- Listeners])
return_ok_or_error([terminate_and_delete(ChildId) || {ChildId, _} <- Listeners])
end.

-spec(close_port(atom(), esockd:listen_on()) -> ok | {error, term()}).
close_port(Proto, ListenOn) ->
case match_listeners(Proto, ListenOn) of
[] -> {error, not_found};
Listeners ->
return_ok_or_error([terminate_port(SupPid) || {_, SupPid} <- Listeners])
end.

-spec(resume_port(atom(), esockd:listen_on()) -> ok | {error, term()}).
resume_port(Proto, ListenOn) ->
case match_listeners(Proto, ListenOn) of
[] -> {error, not_found};
Listeners ->
return_ok_or_error([resume(SupPid) || {_, SupPid} <- Listeners])
end.

terminate_and_delete(ChildId) ->
Expand All @@ -102,6 +122,22 @@ terminate_and_delete(ChildId) ->
Error -> Error
end.

-spec(terminate_port(pid()) -> ok | {error, tcp_close}).
terminate_port(ListenerSup) ->
try
Listener = esockd_listener_sup:listener(ListenerSup),
LSock = esockd_listener:get_sock(Listener),
gen_tcp:close(LSock)
catch _:_ ->
{error, tcp_close}
end.

-spec resume(pid()) -> ok | {error, term()}.
resume(ListenerSup) ->
Listener = esockd_listener_sup:listener(ListenerSup),
AcceptorSup = esockd_listener_sup:acceptor_sup(ListenerSup),
esockd_listener:resume_sock(Listener, AcceptorSup).

-spec(listeners() -> [{term(), pid()}]).
listeners() ->
[{Id, Pid} || {{listener_sup, Id}, Pid, _Type, _} <- supervisor:which_children(?MODULE)].
Expand Down Expand Up @@ -129,7 +165,7 @@ restart_listener(Proto, ListenOn) ->
case match_listeners(Proto, ListenOn) of
[] -> {error, not_found};
Listeners ->
return_ok_or_error([terminate_and_restart(ChildId) || ChildId <- Listeners])
return_ok_or_error([terminate_and_restart(ChildId) || {ChildId, _} <- Listeners])
end.

terminate_and_restart(ChildId) ->
Expand All @@ -139,7 +175,7 @@ terminate_and_restart(ChildId) ->
end.

match_listeners(Proto, ListenOn) ->
[ChildId || {ChildId, _Pid, _Type, _} <- supervisor:which_children(?MODULE),
[{ChildId, Pid} || {ChildId, Pid, _Type, _} <- supervisor:which_children(?MODULE),
match_listener(Proto, ListenOn, ChildId)].

match_listener(Proto, ListenOn, {listener_sup, {Proto, ListenOn}}) ->
Expand Down
18 changes: 18 additions & 0 deletions test/esockd_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,24 @@ t_allow_deny(_) ->
], esockd:get_access_rules({udp_echo, 7001})),
ok = esockd:close(udp_echo, 7001).

t_close_port_and_resume_port(_) ->
{ok, _LSup} = esockd:open(echo, {"127.0.0.1", 5000}, [binary, {packet, raw}],
{echo_server, start_link, []}),
{ok, Sock} = gen_tcp:connect("127.0.0.1", 5000, [binary, {active, false}]),
ok = gen_tcp:send(Sock, <<"Hello">>),
{ok, <<"Hello">>} = gen_tcp:recv(Sock, 0),
ok = esockd:close_port(echo, {"127.0.0.1", 5000}),
ok = gen_tcp:send(Sock, <<"Hello">>),
{ok, <<"Hello">>} = gen_tcp:recv(Sock, 0),
{error, econnrefused} = gen_tcp:connect("127.0.0.1", 5000, [binary, {active, false}]),
esockd:resume_port(echo, {"127.0.0.1", 5000}),
{ok, Sock1} = gen_tcp:connect("127.0.0.1", 5000, [binary, {active, false}]),
ok = gen_tcp:send(Sock1, <<"Hello">>),
{ok, <<"Hello">>} = gen_tcp:recv(Sock1, 0),
ok = gen_tcp:send(Sock, <<"Hello">>),
{ok, <<"Hello">>} = gen_tcp:recv(Sock, 0),
ok = esockd:close(echo, {"127.0.0.1", 5000}).

t_ulimit(_) ->
?assert(is_integer(esockd:ulimit())).

Expand Down

0 comments on commit 70b6ac8

Please sign in to comment.