From 9cd18f3c47c6e1d0559293bf690d223d4c844e23 Mon Sep 17 00:00:00 2001 From: Alexandre de Souza Date: Thu, 16 May 2024 17:33:51 -0300 Subject: [PATCH] Implement other Plug.Conn adapter callbacks (#2) --- .github/workflows/test.yml | 4 +- lib/kino/proxy.ex | 4 +- lib/kino_proxy/adapter.ex | 88 ++++++++++++++++++++++- lib/kino_proxy/server.ex | 80 +++++++++++++++------ mix.exs | 3 +- mix.lock | 11 --- test/kino/proxy_test.exs | 141 ++++++++++++++++++++++++++++++++----- test/support/endpoint.ex | 33 ++------- 8 files changed, 280 insertions(+), 84 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 4fa339e..62de04e 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -42,7 +42,7 @@ jobs: - run: mix deps.compile - # - run: mix compile --warnings-as-errors - # if: ${{ matrix.lint }} + - run: mix compile --warnings-as-errors + if: ${{ matrix.lint }} - run: mix test diff --git a/lib/kino/proxy.ex b/lib/kino/proxy.ex index 12e6b35..4cf0dc7 100644 --- a/lib/kino/proxy.ex +++ b/lib/kino/proxy.ex @@ -4,8 +4,8 @@ defmodule Kino.Proxy do # TODO: Add function docs @doc false - @spec run(pid(), Plug.Conn.t()) :: pid() - defdelegate run(pid, conn), to: KinoProxy.Server + @spec serve(pid(), Plug.Conn.t()) :: {Plug.Conn.t(), atom()} + defdelegate serve(pid, conn), to: KinoProxy.Server # TODO: Add function docs @doc false diff --git a/lib/kino_proxy/adapter.ex b/lib/kino_proxy/adapter.ex index 11f1acd..4bd8133 100644 --- a/lib/kino_proxy/adapter.ex +++ b/lib/kino_proxy/adapter.ex @@ -7,7 +7,93 @@ defmodule KinoProxy.Adapter do receive do {^ref, :ok} -> {:ok, body, {pid, ref}} - {:DOWN, ^ref, _, _, reason} -> exit({{__MODULE__, :send_resp, 4}, reason}) + {:DOWN, ^ref, _, _, reason} -> exit_fun(:send_resp, 4, reason) end end + + def get_peer_data({pid, ref}) do + send(pid, {:get_peer_data, self(), ref}) + + receive do + {^ref, peer_data} -> peer_data + {:DOWN, ^ref, _, _, reason} -> exit_fun(:get_peer_data, 1, reason) + end + end + + def get_http_protocol({pid, ref}) do + send(pid, {:get_http_protocol, self(), ref}) + + receive do + {^ref, http_protocol} -> http_protocol + {:DOWN, ^ref, _, _, reason} -> exit_fun(:get_http_protocol, 1, reason) + end + end + + def read_req_body({pid, ref}, opts) do + send(pid, {:read_req_body, self(), ref, opts}) + + receive do + {^ref, {:ok, data}} -> {:ok, data, {pid, ref}} + {^ref, {:more, data}} -> {:more, data, {pid, ref}} + {^ref, {:error, _} = error} -> error + {:DOWN, ^ref, _, _, reason} -> exit_fun(:read_req_body, 2, reason) + end + end + + def send_chunked({pid, ref}, status, headers) do + send(pid, {:send_chunked, self(), ref, status, headers}) + + receive do + {^ref, :ok} -> {:ok, nil, {pid, ref}} + {:DOWN, ^ref, _, _, reason} -> exit_fun(:send_chunked, 3, reason) + end + end + + def chunk({pid, ref}, chunk) do + send(pid, {:chunk, self(), ref, chunk}) + + receive do + {^ref, :ok} -> :ok + {^ref, {:error, _} = error} -> error + {:DOWN, ^ref, _, _, reason} -> exit_fun(:chunk, 2, reason) + end + end + + def inform({pid, ref}, status, headers) do + send(pid, {:inform, self(), ref, status, headers}) + + receive do + {^ref, :ok} -> {:ok, {pid, ref}} + {:DOWN, ^ref, _, _, reason} -> exit_fun(:inform, 3, reason) + end + end + + def send_file({pid, ref}, status, headers, path, offset, length) do + %File.Stat{type: :regular, size: size} = File.stat!(path) + + length = + cond do + length == :all -> size + is_integer(length) -> length + end + + {:ok, body} = + File.open!(path, [:read, :raw, :binary], fn device -> + :file.pread(device, offset, length) + end) + + send(pid, {:send_resp, self(), ref, status, headers, body}) + + receive do + {^ref, :ok} -> {:ok, body, {pid, ref}} + {:DOWN, ^ref, _, _, reason} -> exit_fun(:send_file, 6, reason) + end + end + + def upgrade(_payload, _protocol, _opts), do: {:error, :not_supported} + def push(_payload, _path, _headers), do: {:error, :not_supported} + + defp exit_fun(fun, arity, reason) do + exit({{__MODULE__, fun, arity}, reason}) + end end diff --git a/lib/kino_proxy/server.ex b/lib/kino_proxy/server.ex index 71b7ff3..967b097 100644 --- a/lib/kino_proxy/server.ex +++ b/lib/kino_proxy/server.ex @@ -2,35 +2,73 @@ defmodule KinoProxy.Server do # TODO: Add doc comments @moduledoc false - @proxy_params ["id", "path"] - - def run(pid, %Plug.Conn{params: %{"path" => path_info}} = conn) when is_pid(pid) do - # TODO: We don't want to pass the whole connection - # but only certain fields, and then rebuild it on the client - %{plug_session: session_data} = conn.private - request_path = "/" <> Enum.join(path_info, "/") - private = %{plug_session: session_data} - params = Map.drop(conn.params, @proxy_params) - path_params = Map.drop(conn.path_params, @proxy_params) - - conn = %{ - conn - | request_path: request_path, - path_info: path_info, - params: params, - path_params: path_params, - private: private - } + import Plug.Conn - spawn_pid = GenServer.call(pid, {:request, conn, self()}) + def serve(pid, %Plug.Conn{} = conn) do + spawn_pid = GenServer.call(pid, {:request, build_client_conn(conn), self()}) monitor_ref = Process.monitor(spawn_pid) + loop(monitor_ref, conn) end + defp build_client_conn(conn) do + %Plug.Conn{ + host: conn.host, + method: conn.method, + owner: conn.owner, + port: conn.port, + remote_ip: conn.remote_ip, + query_string: conn.query_string, + path_info: conn.path_info, + scheme: conn.scheme, + script_name: conn.script_name, + req_headers: conn.req_headers + } + end + defp loop(monitor_ref, conn) do receive do {:send_resp, pid, ref, status, headers, body} -> - conn = Plug.Conn.send_resp(%{conn | resp_headers: headers}, status, body) + conn = send_resp(%{conn | resp_headers: headers}, status, body) + send(pid, {ref, :ok}) + loop(monitor_ref, conn) + + {:get_peer_data, pid, ref} -> + send(pid, {ref, get_peer_data(conn)}) + loop(monitor_ref, conn) + + {:get_http_protocol, pid, ref} -> + send(pid, {ref, get_http_protocol(conn)}) + loop(monitor_ref, conn) + + {:read_req_body, pid, ref, opts} -> + {message, conn} = + case read_body(conn, opts) do + {:ok, data, conn} -> {{:ok, data}, conn} + {:more, data, conn} -> {{:more, data}, conn} + {:error, _} = error -> {error, conn} + end + + send(pid, {ref, message}) + loop(monitor_ref, conn) + + {:send_chunked, pid, ref, status, headers} -> + conn = send_chunked(%{conn | resp_headers: headers}, status) + send(pid, {ref, :ok}) + loop(monitor_ref, conn) + + {:chunk, pid, ref, chunk} -> + {message, conn} = + case chunk(conn, chunk) do + {:error, _} = error -> {error, conn} + result -> result + end + + send(pid, {ref, message}) + loop(monitor_ref, conn) + + {:inform, pid, ref, status, headers} -> + conn = inform(conn, status, headers) send(pid, {ref, :ok}) loop(monitor_ref, conn) diff --git a/mix.exs b/mix.exs index 52560f7..7ba0f04 100644 --- a/mix.exs +++ b/mix.exs @@ -37,8 +37,7 @@ defmodule KinoProxy.MixProject do defp deps do [ {:plug, "~> 1.15.3"}, - {:bandit, "~> 1.5", only: :test}, - {:req, "~> 0.4.14", only: :test} + {:jason, "~> 1.4.1", only: :test} ] end diff --git a/mix.lock b/mix.lock index 49f9133..3a978fb 100644 --- a/mix.lock +++ b/mix.lock @@ -1,18 +1,7 @@ %{ - "bandit": {:hex, :bandit, "1.5.0", "3bc864a0da7f013ad3713a7f550c6a6ec0e19b8d8715ec678256a0dc197d5539", [:mix], [{:hpax, "~> 0.1.1", [hex: :hpax, repo: "hexpm", optional: false]}, {:plug, "~> 1.14", [hex: :plug, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:thousand_island, "~> 1.0", [hex: :thousand_island, repo: "hexpm", optional: false]}, {:websock, "~> 0.5", [hex: :websock, repo: "hexpm", optional: false]}], "hexpm", "92d18d9a7228a597e0d4661ef69a874ea82d63ff49c7d801a5c68cb18ebbbd72"}, - "castore": {:hex, :castore, "1.0.7", "b651241514e5f6956028147fe6637f7ac13802537e895a724f90bf3e36ddd1dd", [:mix], [], "hexpm", "da7785a4b0d2a021cd1292a60875a784b6caef71e76bf4917bdee1f390455cf5"}, - "finch": {:hex, :finch, "0.18.0", "944ac7d34d0bd2ac8998f79f7a811b21d87d911e77a786bc5810adb75632ada4", [:mix], [{:castore, "~> 0.1 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: false]}, {:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:mint, "~> 1.3", [hex: :mint, repo: "hexpm", optional: false]}, {:nimble_options, "~> 0.4 or ~> 1.0", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:nimble_pool, "~> 0.2.6 or ~> 1.0", [hex: :nimble_pool, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "69f5045b042e531e53edc2574f15e25e735b522c37e2ddb766e15b979e03aa65"}, - "hpax": {:hex, :hpax, "0.1.2", "09a75600d9d8bbd064cdd741f21fc06fc1f4cf3d0fcc335e5aa19be1a7235c84", [:mix], [], "hexpm", "2c87843d5a23f5f16748ebe77969880e29809580efdaccd615cd3bed628a8c13"}, "jason": {:hex, :jason, "1.4.1", "af1504e35f629ddcdd6addb3513c3853991f694921b1b9368b0bd32beb9f1b63", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "fbb01ecdfd565b56261302f7e1fcc27c4fb8f32d56eab74db621fc154604a7a1"}, "mime": {:hex, :mime, "2.0.5", "dc34c8efd439abe6ae0343edbb8556f4d63f178594894720607772a041b04b02", [:mix], [], "hexpm", "da0d64a365c45bc9935cc5c8a7fc5e49a0e0f9932a761c55d6c52b142780a05c"}, - "mint": {:hex, :mint, "1.6.0", "88a4f91cd690508a04ff1c3e28952f322528934be541844d54e0ceb765f01d5e", [:mix], [{:castore, "~> 0.1.0 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:hpax, "~> 0.1.1 or ~> 0.2.0", [hex: :hpax, repo: "hexpm", optional: false]}], "hexpm", "3c5ae85d90a5aca0a49c0d8b67360bbe407f3b54f1030a111047ff988e8fefaa"}, - "nimble_options": {:hex, :nimble_options, "1.1.0", "3b31a57ede9cb1502071fade751ab0c7b8dbe75a9a4c2b5bbb0943a690b63172", [:mix], [], "hexpm", "8bbbb3941af3ca9acc7835f5655ea062111c9c27bcac53e004460dfd19008a99"}, - "nimble_ownership": {:hex, :nimble_ownership, "0.3.1", "99d5244672fafdfac89bfad3d3ab8f0d367603ce1dc4855f86a1c75008bce56f", [:mix], [], "hexpm", "4bf510adedff0449a1d6e200e43e57a814794c8b5b6439071274d248d272a549"}, - "nimble_pool": {:hex, :nimble_pool, "1.1.0", "bf9c29fbdcba3564a8b800d1eeb5a3c58f36e1e11d7b7fb2e084a643f645f06b", [:mix], [], "hexpm", "af2e4e6b34197db81f7aad230c1118eac993acc0dae6bc83bac0126d4ae0813a"}, "plug": {:hex, :plug, "1.15.3", "712976f504418f6dff0a3e554c40d705a9bcf89a7ccef92fc6a5ef8f16a30a97", [:mix], [{:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:plug_crypto, "~> 1.1.1 or ~> 1.2 or ~> 2.0", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.3 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "cc4365a3c010a56af402e0809208873d113e9c38c401cabd88027ef4f5c01fd2"}, "plug_crypto": {:hex, :plug_crypto, "2.1.0", "f44309c2b06d249c27c8d3f65cfe08158ade08418cf540fd4f72d4d6863abb7b", [:mix], [], "hexpm", "131216a4b030b8f8ce0f26038bc4421ae60e4bb95c5cf5395e1421437824c4fa"}, - "req": {:hex, :req, "0.4.14", "103de133a076a31044e5458e0f850d5681eef23dfabf3ea34af63212e3b902e2", [:mix], [{:aws_signature, "~> 0.3.2", [hex: :aws_signature, repo: "hexpm", optional: true]}, {:brotli, "~> 0.3.1", [hex: :brotli, repo: "hexpm", optional: true]}, {:ezstd, "~> 1.0", [hex: :ezstd, repo: "hexpm", optional: true]}, {:finch, "~> 0.17", [hex: :finch, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}, {:mime, "~> 1.6 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:nimble_csv, "~> 1.0", [hex: :nimble_csv, repo: "hexpm", optional: true]}, {:nimble_ownership, "~> 0.2.0 or ~> 0.3.0", [hex: :nimble_ownership, repo: "hexpm", optional: false]}, {:plug, "~> 1.0", [hex: :plug, repo: "hexpm", optional: true]}], "hexpm", "2ddd3d33f9ab714ced8d3c15fd03db40c14dbf129003c4a3eb80fac2cc0b1b08"}, "telemetry": {:hex, :telemetry, "1.2.1", "68fdfe8d8f05a8428483a97d7aab2f268aaff24b49e0f599faa091f1d4e7f61c", [:rebar3], [], "hexpm", "dad9ce9d8effc621708f99eac538ef1cbe05d6a874dd741de2e689c47feafed5"}, - "thousand_island": {:hex, :thousand_island, "1.3.5", "6022b6338f1635b3d32406ff98d68b843ba73b3aa95cfc27154223244f3a6ca5", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "2be6954916fdfe4756af3239fb6b6d75d0b8063b5df03ba76fd8a4c87849e180"}, - "websock": {:hex, :websock, "0.5.3", "2f69a6ebe810328555b6fe5c831a851f485e303a7c8ce6c5f675abeb20ebdadc", [:mix], [], "hexpm", "6105453d7fac22c712ad66fab1d45abdf049868f253cf719b625151460b8b453"}, } diff --git a/test/kino/proxy_test.exs b/test/kino/proxy_test.exs index 63b17b8..4c4817a 100644 --- a/test/kino/proxy_test.exs +++ b/test/kino/proxy_test.exs @@ -1,32 +1,137 @@ defmodule Kino.ProxyTest do use ExUnit.Case, async: true + use Plug.Test - setup do - pid = start_supervised!({Bandit, plug: KinoProxy.Endpoint, scheme: :http, port: 0}) - {:ok, {_address, port}} = ThousandIsland.listener_info(pid) - req = Req.new(base_url: "http://localhost:#{port}", retry: false) + test "returns the user-defined response" do + Kino.Proxy.listen(fn conn -> + assert get_req_header(conn, "x-auth-token") == ["foo-bar"] + assert conn.path_info == ["foo", "bar"] + + conn + |> put_resp_header("content-type", "text/plain") + |> send_resp(200, "it works!") + end) + + assert %{resp_body: "it works!", status: 200} = + conn(:get, "/123/proxy/foo/bar") + |> put_req_header("x-auth-token", "foo-bar") + |> run_endpoint() + end - {:ok, req: req} + test "returns the peer data" do + Kino.Proxy.listen(fn conn -> + assert get_peer_data(conn) == %{ + port: 111_317, + address: {127, 0, 0, 1}, + ssl_cert: nil + } + + send_resp(conn, 200, "it works!") + end) + + conn = conn(:get, "/123/proxy/") + assert %{resp_body: "it works!", status: 200} = run_endpoint(conn) end - test "it works", %{req: req} do + test "returns the http protocol" do Kino.Proxy.listen(fn conn -> - # For test assertive purposes - assert Plug.Conn.get_req_header(conn, "x-auth-token") == ["foo-bar"] - assert conn.request_path == "/foo/bar" + assert get_http_protocol(conn) == :"HTTP/1.1" + send_resp(conn, 200, "it works!") + end) + + conn = conn(:get, "/123/proxy/") + assert %{resp_body: "it works!", status: 200} = run_endpoint(conn) + end + + test "reads the body" do + body = :crypto.strong_rand_bytes(20 * 1024 * 1024) + + Kino.Proxy.listen(fn conn -> + stream = + Stream.resource( + fn -> {conn, 0} end, + fn + {:halt, acc} -> + {:halt, acc} + + {conn, count} -> + case read_body(conn) do + {:ok, body, conn} -> + count = count + byte_size(body) + {[body], {:halt, {conn, count}}} + + {:more, body, conn} -> + count = count + byte_size(body) + {[body], {conn, count}} + end + end, + fn {result, _count} -> result end + ) + + assert Enum.join(stream) == body + send_resp(conn, 200, body) + end) + + conn = conn(:get, "/123/proxy/", body) + + assert %{resp_body: ^body, status: 200} = run_endpoint(conn) + assert_receive {_ref, {200, _headers, ^body}} + end + test "sends chunked response" do + chunk = :crypto.strong_rand_bytes(10 * 1024 * 1024) + other_chunk = :crypto.strong_rand_bytes(10 * 1024 * 1024) + + Kino.Proxy.listen(fn conn -> + conn = send_chunked(conn, 200) + assert conn.state == :chunked + + {:ok, _conn} = chunk(conn, chunk) + {:ok, _conn} = chunk(conn, other_chunk) + + conn + end) + + conn = conn(:get, "/123/proxy/") + assert %{resp_body: body, status: 200} = run_endpoint(conn) + assert body == chunk <> other_chunk + end + + test "fails to upgrade with unsupported http protocol" do + Kino.Proxy.listen(fn conn -> + assert_raise ArgumentError, "upgrade to HTTP/2.0 not supported by KinoProxy.Adapter", fn -> + upgrade_adapter(conn, :"HTTP/2.0", []) + end + end) + + conn = conn(:get, "/123/proxy/") + run_endpoint(conn) + end + + test "returns the inform" do + Kino.Proxy.listen(fn conn -> conn - |> Plug.Conn.put_resp_header("content-type", "text/plain") - |> Plug.Conn.send_resp(200, "it works!") + |> inform!(199) + |> send_resp(200, "it works!") end) - response = - Req.get!(req, - url: "/123/proxy/foo/bar", - headers: [{"x-auth-token", "foo-bar"}] - ) + conn = conn(:get, "/123/proxy/") + assert %{resp_body: "it works!", status: 200} = run_endpoint(conn) + end + + test "sends a response with a file to be downloaded" do + file = __ENV__.file + body = File.read!(file) + + Kino.Proxy.listen(fn conn -> + send_file(conn, 200, file) + end) + + conn = conn(:get, "/123/proxy/") + assert %{resp_body: ^body, status: 200} = run_endpoint(conn) + end - assert response.status == 200 - assert response.body == "it works!" + defp run_endpoint(conn, opts \\ []) do + KinoProxy.Endpoint.call(conn, opts) end end diff --git a/test/support/endpoint.ex b/test/support/endpoint.ex index 841aca8..39c9e11 100644 --- a/test/support/endpoint.ex +++ b/test/support/endpoint.ex @@ -5,41 +5,20 @@ defmodule KinoProxy.Endpoint do plug Plug.RequestId - plug Plug.Parsers, - parsers: [:urlencoded, :multipart, :json], - pass: ["*/*"], - json_decoder: Jason - - plug Plug.MethodOverride - plug Plug.Head + plug :dispatch - plug Plug.Session, - store: :cookie, - key: "lb_session", - signing_salt: "deadbook" + match "/:id/proxy/*proxied_path" do + %{path_info: [id, "proxy" | path_info]} = conn - plug :fetch_session - plug :dispatch + script_name = [id, "proxy"] + conn = %{conn | path_info: path_info, script_name: conn.script_name ++ script_name} - match "/:id/proxy/*path" do if pid = GenServer.whereis(Kino.Proxy) do - {conn, _reason} = Kino.Proxy.run(pid, conn) + {conn, _reason} = Kino.Proxy.serve(pid, conn) conn else json = Jason.encode!(%{error: %{details: "Not Found"}}) Plug.Conn.send_resp(conn, 404, json) end end - - def fetch_query_string(conn, _opts) do - Plug.Conn.fetch_query_params(conn) - end - - def fetch_req_cookies(conn, _opts) do - Plug.Conn.fetch_cookies(conn) - end - - def fetch_req_session(conn, _opts) do - Plug.Conn.fetch_session(conn) - end end