Skip to content

Commit

Permalink
Implement other Plug.Conn adapter callbacks (#2)
Browse files Browse the repository at this point in the history
  • Loading branch information
aleDsz committed May 16, 2024
1 parent 2b08342 commit 9cd18f3
Show file tree
Hide file tree
Showing 8 changed files with 280 additions and 84 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ jobs:

- run: mix deps.compile

# - run: mix compile --warnings-as-errors
# if: ${{ matrix.lint }}
- run: mix compile --warnings-as-errors
if: ${{ matrix.lint }}

- run: mix test
4 changes: 2 additions & 2 deletions lib/kino/proxy.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ defmodule Kino.Proxy do

# TODO: Add function docs
@doc false
@spec run(pid(), Plug.Conn.t()) :: pid()
defdelegate run(pid, conn), to: KinoProxy.Server
@spec serve(pid(), Plug.Conn.t()) :: {Plug.Conn.t(), atom()}
defdelegate serve(pid, conn), to: KinoProxy.Server

# TODO: Add function docs
@doc false
Expand Down
88 changes: 87 additions & 1 deletion lib/kino_proxy/adapter.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,93 @@ defmodule KinoProxy.Adapter do

receive do
{^ref, :ok} -> {:ok, body, {pid, ref}}
{:DOWN, ^ref, _, _, reason} -> exit({{__MODULE__, :send_resp, 4}, reason})
{:DOWN, ^ref, _, _, reason} -> exit_fun(:send_resp, 4, reason)
end
end

def get_peer_data({pid, ref}) do
send(pid, {:get_peer_data, self(), ref})

receive do
{^ref, peer_data} -> peer_data
{:DOWN, ^ref, _, _, reason} -> exit_fun(:get_peer_data, 1, reason)
end
end

def get_http_protocol({pid, ref}) do
send(pid, {:get_http_protocol, self(), ref})

receive do
{^ref, http_protocol} -> http_protocol
{:DOWN, ^ref, _, _, reason} -> exit_fun(:get_http_protocol, 1, reason)
end
end

def read_req_body({pid, ref}, opts) do
send(pid, {:read_req_body, self(), ref, opts})

receive do
{^ref, {:ok, data}} -> {:ok, data, {pid, ref}}
{^ref, {:more, data}} -> {:more, data, {pid, ref}}
{^ref, {:error, _} = error} -> error
{:DOWN, ^ref, _, _, reason} -> exit_fun(:read_req_body, 2, reason)
end
end

def send_chunked({pid, ref}, status, headers) do
send(pid, {:send_chunked, self(), ref, status, headers})

receive do
{^ref, :ok} -> {:ok, nil, {pid, ref}}
{:DOWN, ^ref, _, _, reason} -> exit_fun(:send_chunked, 3, reason)
end
end

def chunk({pid, ref}, chunk) do
send(pid, {:chunk, self(), ref, chunk})

receive do
{^ref, :ok} -> :ok
{^ref, {:error, _} = error} -> error
{:DOWN, ^ref, _, _, reason} -> exit_fun(:chunk, 2, reason)
end
end

def inform({pid, ref}, status, headers) do
send(pid, {:inform, self(), ref, status, headers})

receive do
{^ref, :ok} -> {:ok, {pid, ref}}
{:DOWN, ^ref, _, _, reason} -> exit_fun(:inform, 3, reason)
end
end

def send_file({pid, ref}, status, headers, path, offset, length) do
%File.Stat{type: :regular, size: size} = File.stat!(path)

length =
cond do
length == :all -> size
is_integer(length) -> length
end

{:ok, body} =
File.open!(path, [:read, :raw, :binary], fn device ->
:file.pread(device, offset, length)
end)

send(pid, {:send_resp, self(), ref, status, headers, body})

receive do
{^ref, :ok} -> {:ok, body, {pid, ref}}
{:DOWN, ^ref, _, _, reason} -> exit_fun(:send_file, 6, reason)
end
end

def upgrade(_payload, _protocol, _opts), do: {:error, :not_supported}
def push(_payload, _path, _headers), do: {:error, :not_supported}

defp exit_fun(fun, arity, reason) do
exit({{__MODULE__, fun, arity}, reason})
end
end
80 changes: 59 additions & 21 deletions lib/kino_proxy/server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,35 +2,73 @@ defmodule KinoProxy.Server do
# TODO: Add doc comments
@moduledoc false

@proxy_params ["id", "path"]

def run(pid, %Plug.Conn{params: %{"path" => path_info}} = conn) when is_pid(pid) do
# TODO: We don't want to pass the whole connection
# but only certain fields, and then rebuild it on the client
%{plug_session: session_data} = conn.private
request_path = "/" <> Enum.join(path_info, "/")
private = %{plug_session: session_data}
params = Map.drop(conn.params, @proxy_params)
path_params = Map.drop(conn.path_params, @proxy_params)

conn = %{
conn
| request_path: request_path,
path_info: path_info,
params: params,
path_params: path_params,
private: private
}
import Plug.Conn

spawn_pid = GenServer.call(pid, {:request, conn, self()})
def serve(pid, %Plug.Conn{} = conn) do
spawn_pid = GenServer.call(pid, {:request, build_client_conn(conn), self()})
monitor_ref = Process.monitor(spawn_pid)

loop(monitor_ref, conn)
end

defp build_client_conn(conn) do
%Plug.Conn{
host: conn.host,
method: conn.method,
owner: conn.owner,
port: conn.port,
remote_ip: conn.remote_ip,
query_string: conn.query_string,
path_info: conn.path_info,
scheme: conn.scheme,
script_name: conn.script_name,
req_headers: conn.req_headers
}
end

defp loop(monitor_ref, conn) do
receive do
{:send_resp, pid, ref, status, headers, body} ->
conn = Plug.Conn.send_resp(%{conn | resp_headers: headers}, status, body)
conn = send_resp(%{conn | resp_headers: headers}, status, body)
send(pid, {ref, :ok})
loop(monitor_ref, conn)

{:get_peer_data, pid, ref} ->
send(pid, {ref, get_peer_data(conn)})
loop(monitor_ref, conn)

{:get_http_protocol, pid, ref} ->
send(pid, {ref, get_http_protocol(conn)})
loop(monitor_ref, conn)

{:read_req_body, pid, ref, opts} ->
{message, conn} =
case read_body(conn, opts) do
{:ok, data, conn} -> {{:ok, data}, conn}
{:more, data, conn} -> {{:more, data}, conn}
{:error, _} = error -> {error, conn}
end

send(pid, {ref, message})
loop(monitor_ref, conn)

{:send_chunked, pid, ref, status, headers} ->
conn = send_chunked(%{conn | resp_headers: headers}, status)
send(pid, {ref, :ok})
loop(monitor_ref, conn)

{:chunk, pid, ref, chunk} ->
{message, conn} =
case chunk(conn, chunk) do
{:error, _} = error -> {error, conn}
result -> result
end

send(pid, {ref, message})
loop(monitor_ref, conn)

{:inform, pid, ref, status, headers} ->
conn = inform(conn, status, headers)
send(pid, {ref, :ok})
loop(monitor_ref, conn)

Expand Down
3 changes: 1 addition & 2 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,7 @@ defmodule KinoProxy.MixProject do
defp deps do
[
{:plug, "~> 1.15.3"},
{:bandit, "~> 1.5", only: :test},
{:req, "~> 0.4.14", only: :test}
{:jason, "~> 1.4.1", only: :test}
]
end

Expand Down
11 changes: 0 additions & 11 deletions mix.lock
Original file line number Diff line number Diff line change
@@ -1,18 +1,7 @@
%{
"bandit": {:hex, :bandit, "1.5.0", "3bc864a0da7f013ad3713a7f550c6a6ec0e19b8d8715ec678256a0dc197d5539", [:mix], [{:hpax, "~> 0.1.1", [hex: :hpax, repo: "hexpm", optional: false]}, {:plug, "~> 1.14", [hex: :plug, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:thousand_island, "~> 1.0", [hex: :thousand_island, repo: "hexpm", optional: false]}, {:websock, "~> 0.5", [hex: :websock, repo: "hexpm", optional: false]}], "hexpm", "92d18d9a7228a597e0d4661ef69a874ea82d63ff49c7d801a5c68cb18ebbbd72"},
"castore": {:hex, :castore, "1.0.7", "b651241514e5f6956028147fe6637f7ac13802537e895a724f90bf3e36ddd1dd", [:mix], [], "hexpm", "da7785a4b0d2a021cd1292a60875a784b6caef71e76bf4917bdee1f390455cf5"},
"finch": {:hex, :finch, "0.18.0", "944ac7d34d0bd2ac8998f79f7a811b21d87d911e77a786bc5810adb75632ada4", [:mix], [{:castore, "~> 0.1 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: false]}, {:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:mint, "~> 1.3", [hex: :mint, repo: "hexpm", optional: false]}, {:nimble_options, "~> 0.4 or ~> 1.0", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:nimble_pool, "~> 0.2.6 or ~> 1.0", [hex: :nimble_pool, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "69f5045b042e531e53edc2574f15e25e735b522c37e2ddb766e15b979e03aa65"},
"hpax": {:hex, :hpax, "0.1.2", "09a75600d9d8bbd064cdd741f21fc06fc1f4cf3d0fcc335e5aa19be1a7235c84", [:mix], [], "hexpm", "2c87843d5a23f5f16748ebe77969880e29809580efdaccd615cd3bed628a8c13"},
"jason": {:hex, :jason, "1.4.1", "af1504e35f629ddcdd6addb3513c3853991f694921b1b9368b0bd32beb9f1b63", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "fbb01ecdfd565b56261302f7e1fcc27c4fb8f32d56eab74db621fc154604a7a1"},
"mime": {:hex, :mime, "2.0.5", "dc34c8efd439abe6ae0343edbb8556f4d63f178594894720607772a041b04b02", [:mix], [], "hexpm", "da0d64a365c45bc9935cc5c8a7fc5e49a0e0f9932a761c55d6c52b142780a05c"},
"mint": {:hex, :mint, "1.6.0", "88a4f91cd690508a04ff1c3e28952f322528934be541844d54e0ceb765f01d5e", [:mix], [{:castore, "~> 0.1.0 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:hpax, "~> 0.1.1 or ~> 0.2.0", [hex: :hpax, repo: "hexpm", optional: false]}], "hexpm", "3c5ae85d90a5aca0a49c0d8b67360bbe407f3b54f1030a111047ff988e8fefaa"},
"nimble_options": {:hex, :nimble_options, "1.1.0", "3b31a57ede9cb1502071fade751ab0c7b8dbe75a9a4c2b5bbb0943a690b63172", [:mix], [], "hexpm", "8bbbb3941af3ca9acc7835f5655ea062111c9c27bcac53e004460dfd19008a99"},
"nimble_ownership": {:hex, :nimble_ownership, "0.3.1", "99d5244672fafdfac89bfad3d3ab8f0d367603ce1dc4855f86a1c75008bce56f", [:mix], [], "hexpm", "4bf510adedff0449a1d6e200e43e57a814794c8b5b6439071274d248d272a549"},
"nimble_pool": {:hex, :nimble_pool, "1.1.0", "bf9c29fbdcba3564a8b800d1eeb5a3c58f36e1e11d7b7fb2e084a643f645f06b", [:mix], [], "hexpm", "af2e4e6b34197db81f7aad230c1118eac993acc0dae6bc83bac0126d4ae0813a"},
"plug": {:hex, :plug, "1.15.3", "712976f504418f6dff0a3e554c40d705a9bcf89a7ccef92fc6a5ef8f16a30a97", [:mix], [{:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:plug_crypto, "~> 1.1.1 or ~> 1.2 or ~> 2.0", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.3 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "cc4365a3c010a56af402e0809208873d113e9c38c401cabd88027ef4f5c01fd2"},
"plug_crypto": {:hex, :plug_crypto, "2.1.0", "f44309c2b06d249c27c8d3f65cfe08158ade08418cf540fd4f72d4d6863abb7b", [:mix], [], "hexpm", "131216a4b030b8f8ce0f26038bc4421ae60e4bb95c5cf5395e1421437824c4fa"},
"req": {:hex, :req, "0.4.14", "103de133a076a31044e5458e0f850d5681eef23dfabf3ea34af63212e3b902e2", [:mix], [{:aws_signature, "~> 0.3.2", [hex: :aws_signature, repo: "hexpm", optional: true]}, {:brotli, "~> 0.3.1", [hex: :brotli, repo: "hexpm", optional: true]}, {:ezstd, "~> 1.0", [hex: :ezstd, repo: "hexpm", optional: true]}, {:finch, "~> 0.17", [hex: :finch, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}, {:mime, "~> 1.6 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:nimble_csv, "~> 1.0", [hex: :nimble_csv, repo: "hexpm", optional: true]}, {:nimble_ownership, "~> 0.2.0 or ~> 0.3.0", [hex: :nimble_ownership, repo: "hexpm", optional: false]}, {:plug, "~> 1.0", [hex: :plug, repo: "hexpm", optional: true]}], "hexpm", "2ddd3d33f9ab714ced8d3c15fd03db40c14dbf129003c4a3eb80fac2cc0b1b08"},
"telemetry": {:hex, :telemetry, "1.2.1", "68fdfe8d8f05a8428483a97d7aab2f268aaff24b49e0f599faa091f1d4e7f61c", [:rebar3], [], "hexpm", "dad9ce9d8effc621708f99eac538ef1cbe05d6a874dd741de2e689c47feafed5"},
"thousand_island": {:hex, :thousand_island, "1.3.5", "6022b6338f1635b3d32406ff98d68b843ba73b3aa95cfc27154223244f3a6ca5", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "2be6954916fdfe4756af3239fb6b6d75d0b8063b5df03ba76fd8a4c87849e180"},
"websock": {:hex, :websock, "0.5.3", "2f69a6ebe810328555b6fe5c831a851f485e303a7c8ce6c5f675abeb20ebdadc", [:mix], [], "hexpm", "6105453d7fac22c712ad66fab1d45abdf049868f253cf719b625151460b8b453"},
}
141 changes: 123 additions & 18 deletions test/kino/proxy_test.exs
Original file line number Diff line number Diff line change
@@ -1,32 +1,137 @@
defmodule Kino.ProxyTest do
use ExUnit.Case, async: true
use Plug.Test

setup do
pid = start_supervised!({Bandit, plug: KinoProxy.Endpoint, scheme: :http, port: 0})
{:ok, {_address, port}} = ThousandIsland.listener_info(pid)
req = Req.new(base_url: "http://localhost:#{port}", retry: false)
test "returns the user-defined response" do
Kino.Proxy.listen(fn conn ->
assert get_req_header(conn, "x-auth-token") == ["foo-bar"]
assert conn.path_info == ["foo", "bar"]

conn
|> put_resp_header("content-type", "text/plain")
|> send_resp(200, "it works!")
end)

assert %{resp_body: "it works!", status: 200} =
conn(:get, "/123/proxy/foo/bar")
|> put_req_header("x-auth-token", "foo-bar")
|> run_endpoint()
end

{:ok, req: req}
test "returns the peer data" do
Kino.Proxy.listen(fn conn ->
assert get_peer_data(conn) == %{
port: 111_317,
address: {127, 0, 0, 1},
ssl_cert: nil
}

send_resp(conn, 200, "it works!")
end)

conn = conn(:get, "/123/proxy/")
assert %{resp_body: "it works!", status: 200} = run_endpoint(conn)
end

test "it works", %{req: req} do
test "returns the http protocol" do
Kino.Proxy.listen(fn conn ->
# For test assertive purposes
assert Plug.Conn.get_req_header(conn, "x-auth-token") == ["foo-bar"]
assert conn.request_path == "/foo/bar"
assert get_http_protocol(conn) == :"HTTP/1.1"
send_resp(conn, 200, "it works!")
end)

conn = conn(:get, "/123/proxy/")
assert %{resp_body: "it works!", status: 200} = run_endpoint(conn)
end

test "reads the body" do
body = :crypto.strong_rand_bytes(20 * 1024 * 1024)

Kino.Proxy.listen(fn conn ->
stream =
Stream.resource(
fn -> {conn, 0} end,
fn
{:halt, acc} ->
{:halt, acc}

{conn, count} ->
case read_body(conn) do
{:ok, body, conn} ->
count = count + byte_size(body)
{[body], {:halt, {conn, count}}}

{:more, body, conn} ->
count = count + byte_size(body)
{[body], {conn, count}}
end
end,
fn {result, _count} -> result end
)

assert Enum.join(stream) == body
send_resp(conn, 200, body)
end)

conn = conn(:get, "/123/proxy/", body)

assert %{resp_body: ^body, status: 200} = run_endpoint(conn)
assert_receive {_ref, {200, _headers, ^body}}
end

test "sends chunked response" do
chunk = :crypto.strong_rand_bytes(10 * 1024 * 1024)
other_chunk = :crypto.strong_rand_bytes(10 * 1024 * 1024)

Kino.Proxy.listen(fn conn ->
conn = send_chunked(conn, 200)
assert conn.state == :chunked

{:ok, _conn} = chunk(conn, chunk)
{:ok, _conn} = chunk(conn, other_chunk)

conn
end)

conn = conn(:get, "/123/proxy/")
assert %{resp_body: body, status: 200} = run_endpoint(conn)
assert body == chunk <> other_chunk
end

test "fails to upgrade with unsupported http protocol" do
Kino.Proxy.listen(fn conn ->
assert_raise ArgumentError, "upgrade to HTTP/2.0 not supported by KinoProxy.Adapter", fn ->
upgrade_adapter(conn, :"HTTP/2.0", [])
end
end)

conn = conn(:get, "/123/proxy/")
run_endpoint(conn)
end

test "returns the inform" do
Kino.Proxy.listen(fn conn ->
conn
|> Plug.Conn.put_resp_header("content-type", "text/plain")
|> Plug.Conn.send_resp(200, "it works!")
|> inform!(199)
|> send_resp(200, "it works!")
end)

response =
Req.get!(req,
url: "/123/proxy/foo/bar",
headers: [{"x-auth-token", "foo-bar"}]
)
conn = conn(:get, "/123/proxy/")
assert %{resp_body: "it works!", status: 200} = run_endpoint(conn)
end

test "sends a response with a file to be downloaded" do
file = __ENV__.file
body = File.read!(file)

Kino.Proxy.listen(fn conn ->
send_file(conn, 200, file)
end)

conn = conn(:get, "/123/proxy/")
assert %{resp_body: ^body, status: 200} = run_endpoint(conn)
end

assert response.status == 200
assert response.body == "it works!"
defp run_endpoint(conn, opts \\ []) do
KinoProxy.Endpoint.call(conn, opts)
end
end
Loading

0 comments on commit 9cd18f3

Please sign in to comment.