diff options
author | Jan Huwald <jh@sotun.de> | 2012-06-04 12:43:43 (GMT) |
---|---|---|
committer | Jan Huwald <jh@sotun.de> | 2012-06-04 12:43:43 (GMT) |
commit | f33aa706efbcfd0663a9bda060972081ea47e8f4 (patch) | |
tree | 7a7cccbc998517bdef4e787495c46112e3540276 /ereproxy_server.erl |
Initial commit
Diffstat (limited to 'ereproxy_server.erl')
-rw-r--r-- | ereproxy_server.erl | 131 |
1 files changed, 131 insertions, 0 deletions
diff --git a/ereproxy_server.erl b/ereproxy_server.erl new file mode 100644 index 0000000..ef59144 --- /dev/null +++ b/ereproxy_server.erl @@ -0,0 +1,131 @@ +-module(ereproxy_server). +-behavior(supervisor). +-export([start_link/1, init/1, start_link_acceptor/2, acceptor/2, con/2, pass_through/1]). + +-include("ereproxy_config.hrl"). + +%%% supervisor over all connections + +start_link(CfgMod) -> + supervisor:start_link(?MODULE, CfgMod). + +init(CfgMod) -> + Cfg = CfgMod:config(), + {ok, {{one_for_one, 10, 60}, + [{{?MODULE, Listen}, % id + {?MODULE, start_link_acceptor, [CfgMod, Listen]}, + permanent, brutal_kill, worker, + [?MODULE]} + || Listen <- Cfg#cfg.listen]}}. + +%%% transport helper +tp(tcp, setup, _Args) -> ok; +tp(tcp, setopts, Args) -> apply(inet, setopts, Args); +tp(tcp, peername, Args) -> apply(inet, peername, Args); +tp(tcp, Method, Args) -> apply(gen_tcp, Method, Args); +tp(ssl, accept, Args) -> apply(ssl, transport_accept, Args); +tp(ssl, setup, Args) -> apply(ssl, ssl_accept, Args); +tp(ssl, Method, Args) -> apply(ssl, Method, Args). + +tp_c2t(tcp_closed) -> tcp; +tp_c2t(ssl_closed) -> ssl. + +tp_t2c(tcp) -> tcp_closed; +tp_t2c(ssl) -> ssl_closed. + +%%% listen and accept on a single port + +start_link_acceptor(CfgMod, {Method, ListenPort}) -> + DefaultOpts = [binary, {packet, 0}, {active, false}, {reuseaddr, true}], + {TP, Opts} = + case Method of + http -> {tcp, DefaultOpts}; + https -> {ssl, (CfgMod:config())#cfg.ssl_opts ++ DefaultOpts} + end, + case tp(TP, listen, [ListenPort, Opts]) of + {ok, Sock} -> + {ok, spawn_link(?MODULE, acceptor, [CfgMod, {TP, Sock}])}; + Error -> + error_logger:error_report( + [{?MODULE, ?LINE}, Error]), + {error, Error} + end. + +%% accept new connections and start a thread for them +acceptor(CfgMod, {TP, ListenSock}) -> + {ok, Sock} = tp(TP, accept, [ListenSock]), + Worker = spawn(?MODULE, con, [CfgMod, {TP, Sock}]), + ok = tp(TP, controlling_process, [Sock, Worker]), + ?MODULE:acceptor(CfgMod, {TP, ListenSock}). % allow code update + +%% con: care for connection until destination is known +con(CfgMod, ClientCon = {TP, ClientSock}) -> + MaxCache = (CfgMod:config())#cfg.max_peek, + ok = tp(TP, setup, [ClientSock]), + ok = tp(TP, setopts, [ClientSock, [{active, once}]]), + % get the hostname from http header + {ConHead, HostName} = parse_header(MaxCache, ClientCon), + {Host, Port} = CfgMod:select_destination(HostName), + % connect to destination + {ok, ServerSock} = gen_tcp:connect(Host, Port, [binary, {packet, 0}, {active, once}]), + % log redirection details + {ok, {SrcAddr, SrcPort}} = tp(TP, peername, [ClientSock]), + {ok, {RedAddr, RedPort}} = inet:sockname(ServerSock), + {ok, {DstAddr, DstPort}} = inet:peername(ServerSock), + ereproxy_log ! {{SrcAddr, SrcPort}, {RedAddr, RedPort}, {DstAddr, DstPort}}, + % pass through connection + gen_tcp:send(ServerSock, ConHead), + pass_through([ClientCon, {tcp, ServerSock}]). + +%% parse_header: search for the "^Host: " field + +% stop if too much data has been allocated +parse_header(NegativeCache, _, _, _, _) when NegativeCache < 0 -> + cachelimit_exceeded; + +% receive some bytes +parse_header(MaxCache, Con = {TP, Sock}, OldData, CurrentLine, _RestData = <<>>) -> + TP_Close = tp_t2c(TP), + receive + {TP, Sock, Data} -> + ok = tp(TP, setopts, [Sock, [{active, once}]]), + parse_header(MaxCache - size(Data), Con, OldData, CurrentLine, Data); + {TP_Close, Sock} -> + connection_closed + end; + +% parse: end of header (-> no Host-directive found) (two versions: LF/CRLF) +parse_header(_, _, OldData, <<>>, RestData = <<10:8, _>>) -> + {<<OldData/binary, RestData/binary>>, no_host}; +parse_header(_, _, OldData, <<13:8>>, RestData = <<10:8, _>>) -> + {<<OldData/binary, 13:8, RestData/binary>>, no_host}; +% parse: end of line, host directive +parse_header(_, _, OldData, CurrentLine = <<"Host: ", HostName/binary>>, RestData = <<10:8, _/binary>>) -> + {<<OldData/binary, CurrentLine/binary, RestData/binary>>, string:to_lower(binary_to_list(HostName) -- [$\r])}; +% parse: end of line +parse_header(MaxCache, Con, OldData, Line, <<10:8, RestData/binary>>) -> + parse_header(MaxCache, Con, <<OldData/binary, Line/binary, 10:8>>, <<>>, RestData); +% parse: any char +parse_header(MaxCache, Con, OldData, Line, <<Char:1/binary, RestData/binary>>) -> + parse_header(MaxCache, Con, OldData, <<Line/binary, Char/binary>>, RestData). + +% init all params +parse_header(MaxCache, Con) -> + parse_header(MaxCache, Con, <<>>, <<>>, <<>>). + +%% connect client and destination server +pass_through(Cons) -> + receive + {STP, Src, Data} -> + [{DTP, Dst}] = lists:subtract(Cons, [{STP, Src}]), + ok = tp(STP, setopts, [Src, [{active, once}]]), + ok = tp(DTP, send, [Dst, Data]), + ?MODULE:pass_through(Cons); % allow code update + {STP_Closed, Src} -> + STP = tp_c2t(STP_Closed), + [{DTP, Dst}] = lists:subtract(Cons, [{STP, Src}]), + ok = tp(DTP, close, [Dst]), + connections_closed + % TODO: check if this is standart compliant; we close both + % sides if one side closes one channel! + end. |