diff options
author | Alex S <alex.strizhakov@gmail.com> | 2019-08-13 14:37:19 +0300 |
---|---|---|
committer | Alex S <alex.strizhakov@gmail.com> | 2019-08-20 12:39:53 +0300 |
commit | 2caf9ad95424b0a4f47c1e22ce9d57a29fcf9fbb (patch) | |
tree | 9ffd3a5ac15c0f1e27dcb174a3448eb5944f7cd7 | |
parent | c51aa48e60103307307c7c40c2d046719def7054 (diff) | |
download | pleroma-2caf9ad95424b0a4f47c1e22ce9d57a29fcf9fbb.tar.gz |
added gun connections genserver
-rw-r--r-- | config/test.exs | 2 | ||||
-rw-r--r-- | lib/pleroma/gun/api/api.ex | 15 | ||||
-rw-r--r-- | lib/pleroma/gun/api/mock.ex | 40 | ||||
-rw-r--r-- | lib/pleroma/gun/conn.ex | 17 | ||||
-rw-r--r-- | lib/pleroma/gun/connections.ex | 104 | ||||
-rw-r--r-- | lib/pleroma/http/connection.ex | 1 | ||||
-rw-r--r-- | lib/pleroma/reverse_proxy/client.ex | 2 | ||||
-rw-r--r-- | lib/pleroma/reverse_proxy/client/tesla.ex | 10 | ||||
-rw-r--r-- | mix.exs | 2 | ||||
-rw-r--r-- | mix.lock | 2 | ||||
-rw-r--r-- | test/gun/connections_test.exs | 172 | ||||
-rw-r--r-- | test/reverse_proxy/reverse_proxy_test.exs | 33 |
12 files changed, 393 insertions, 7 deletions
diff --git a/config/test.exs b/config/test.exs index 6f75f39b5..ca916d59e 100644 --- a/config/test.exs +++ b/config/test.exs @@ -85,6 +85,8 @@ config :joken, default_signer: "yU8uHKq+yyAkZ11Hx//jcdacWc8yQ1bxAAGrplzB0Zwwjkp3 config :pleroma, Pleroma.ReverseProxy.Client, Pleroma.ReverseProxy.ClientMock +config :pleroma, Pleroma.Gun.API, Pleroma.Gun.API.Mock + try do import_config "test.secret.exs" rescue diff --git a/lib/pleroma/gun/api/api.ex b/lib/pleroma/gun/api/api.ex new file mode 100644 index 000000000..c69ab890e --- /dev/null +++ b/lib/pleroma/gun/api/api.ex @@ -0,0 +1,15 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Gun.API do + @callback open(charlist(), pos_integer(), map()) :: {:ok, pid()} + + def open(host, port, opts) do + api().open(host, port, opts) + end + + defp api do + Pleroma.Config.get([Pleroma.Gun.API], :gun) + end +end diff --git a/lib/pleroma/gun/api/mock.ex b/lib/pleroma/gun/api/mock.ex new file mode 100644 index 000000000..3348715c4 --- /dev/null +++ b/lib/pleroma/gun/api/mock.ex @@ -0,0 +1,40 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Gun.API.Mock do + @behaviour Pleroma.Gun.API + @impl Pleroma.Gun.API + def open('some-domain.com', 80, %{genserver_pid: genserver_pid}) do + {:ok, conn_pid} = Task.start_link(fn -> Process.sleep(1_000) end) + send(genserver_pid, {:gun_up, conn_pid, :http}) + {:ok, conn_pid} + end + + def open('some-domain.com', 443, %{genserver_pid: genserver_pid}) do + {:ok, conn_pid} = Task.start_link(fn -> Process.sleep(1_000) end) + send(genserver_pid, {:gun_up, conn_pid, :https}) + {:ok, conn_pid} + end + + @impl Pleroma.Gun.API + def open('gun_down.com', _port, %{genserver_pid: genserver_pid}) do + {:ok, conn_pid} = Task.start_link(fn -> Process.sleep(1_000) end) + send(genserver_pid, {:gun_down, conn_pid, :http, nil, nil, nil}) + {:ok, conn_pid} + end + + @impl Pleroma.Gun.API + def open('gun_down_and_up.com', _port, %{genserver_pid: genserver_pid}) do + {:ok, conn_pid} = Task.start_link(fn -> Process.sleep(1_000) end) + send(genserver_pid, {:gun_down, conn_pid, :http, nil, nil, nil}) + + {:ok, _} = + Task.start_link(fn -> + Process.sleep(500) + send(genserver_pid, {:gun_up, conn_pid, :http}) + end) + + {:ok, conn_pid} + end +end diff --git a/lib/pleroma/gun/conn.ex b/lib/pleroma/gun/conn.ex new file mode 100644 index 000000000..62ef146a1 --- /dev/null +++ b/lib/pleroma/gun/conn.ex @@ -0,0 +1,17 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Gun.Conn do + @moduledoc """ + Struct for gun connection data + """ + @type t :: %__MODULE__{ + conn: pid(), + state: atom(), + waiting_pids: [pid()], + protocol: atom() + } + + defstruct conn: nil, state: :open, waiting_pids: [], protocol: :http +end diff --git a/lib/pleroma/gun/connections.ex b/lib/pleroma/gun/connections.ex new file mode 100644 index 000000000..60ec68d89 --- /dev/null +++ b/lib/pleroma/gun/connections.ex @@ -0,0 +1,104 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Gun.Connections do + use GenServer + + @type domain :: String.t() + @type conn :: Gun.Conn.t() + @type t :: %__MODULE__{ + conns: %{domain() => conn()} + } + + defstruct conns: %{} + + def start_link(name \\ __MODULE__) do + if Application.get_env(:tesla, :adapter) == Tesla.Adapter.Gun do + GenServer.start_link(__MODULE__, [], name: name) + else + :ignore + end + end + + @impl true + def init(_) do + {:ok, %__MODULE__{conns: %{}}} + end + + @spec get_conn(atom(), String.t(), keyword()) :: pid() + def get_conn(name \\ __MODULE__, url, opts \\ []) do + opts = Enum.into(opts, %{}) + uri = URI.parse(url) + + opts = if uri.scheme == "https", do: Map.put(opts, :transport, :tls), else: opts + + GenServer.call( + name, + {:conn, %{opts: opts, uri: uri}} + ) + end + + @spec get_state(atom()) :: t() + def get_state(name \\ __MODULE__) do + GenServer.call(name, {:state}) + end + + @impl true + def handle_call({:conn, %{opts: opts, uri: uri}}, from, state) do + key = compose_key(uri) + + case state.conns[key] do + %{conn: conn, state: conn_state} when conn_state == :up -> + {:reply, conn, state} + + %{state: conn_state, waiting_pids: pids} when conn_state in [:open, :down] -> + state = put_in(state.conns[key].waiting_pids, [from | pids]) + {:noreply, state} + + nil -> + {:ok, conn} = Pleroma.Gun.API.open(to_charlist(uri.host), uri.port, opts) + + state = + put_in(state.conns[key], %Pleroma.Gun.Conn{ + conn: conn, + waiting_pids: [from], + protocol: String.to_atom(uri.scheme) + }) + + {:noreply, state} + end + end + + @impl true + def handle_call({:state}, _from, state), do: {:reply, state, state} + + @impl true + def handle_info({:gun_up, conn_pid, protocol}, state) do + {key, conn} = find_conn(state.conns, conn_pid, protocol) + + # Send to all waiting processes connection pid + Enum.each(conn.waiting_pids, fn waiting_pid -> GenServer.reply(waiting_pid, conn_pid) end) + + # Update state of the current connection and set waiting_pids to empty list + state = put_in(state.conns[key], %{conn | state: :up, waiting_pids: []}) + {:noreply, state} + end + + @impl true + # Do we need to do something with killed & unprocessed references? + def handle_info({:gun_down, conn_pid, protocol, _reason, _killed, _unprocessed}, state) do + {key, conn} = find_conn(state.conns, conn_pid, protocol) + + # We don't want to block requests to GenServer if gun send down message, return nil, so we can make some retries, while connection is not up + Enum.each(conn.waiting_pids, fn waiting_pid -> GenServer.reply(waiting_pid, nil) end) + + state = put_in(state.conns[key].state, :down) + {:noreply, state} + end + + defp compose_key(uri), do: uri.host <> ":" <> to_string(uri.port) + + defp find_conn(conns, conn_pid, protocol), + do: Enum.find(conns, fn {_, conn} -> conn.conn == conn_pid and conn.protocol == protocol end) +end diff --git a/lib/pleroma/http/connection.ex b/lib/pleroma/http/connection.ex index ff03a3ee3..6cb26c0fe 100644 --- a/lib/pleroma/http/connection.ex +++ b/lib/pleroma/http/connection.ex @@ -9,7 +9,6 @@ defmodule Pleroma.HTTP.Connection do @options [ connect_timeout: 10_000, - protocols: [:http], timeout: 20_000 ] diff --git a/lib/pleroma/reverse_proxy/client.ex b/lib/pleroma/reverse_proxy/client.ex index 42f2ff13b..71c2b2911 100644 --- a/lib/pleroma/reverse_proxy/client.ex +++ b/lib/pleroma/reverse_proxy/client.ex @@ -28,6 +28,6 @@ defmodule Pleroma.ReverseProxy.Client do def close(ref), do: client().close(ref) defp client do - Pleroma.Config.get([Pleroma.ReverseProxy.Client], Pleroma.ReverseProxy.Client.Hackney) + Pleroma.Config.get([Pleroma.ReverseProxy.Client], Pleroma.ReverseProxy.Client.Tesla) end end diff --git a/lib/pleroma/reverse_proxy/client/tesla.ex b/lib/pleroma/reverse_proxy/client/tesla.ex index b1498a5a4..fad577ec1 100644 --- a/lib/pleroma/reverse_proxy/client/tesla.ex +++ b/lib/pleroma/reverse_proxy/client/tesla.ex @@ -1,3 +1,7 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-onl + defmodule Pleroma.ReverseProxy.Client.Tesla do @behaviour Pleroma.ReverseProxy.Client @@ -6,7 +10,7 @@ defmodule Pleroma.ReverseProxy.Client.Tesla do def request(method, url, headers, body, opts \\ []) do adapter_opts = Keyword.get(opts, :adapter, []) - |> Keyword.put(:chunks_response, true) + |> Keyword.put(:body_as, :chunks) with {:ok, response} <- Pleroma.HTTP.request( @@ -44,13 +48,13 @@ defmodule Pleroma.ReverseProxy.Client.Tesla do adapter.read_chunk(pid, stream, opts) end - def close(client) do + def close(pid) do adapter = Application.get_env(:tesla, :adapter) unless adapter in @adapters do raise "#{adapter} doesn't support closing connection" end - adapter.close(client) + adapter.close(pid) end end @@ -113,7 +113,7 @@ defmodule Pleroma.Mixfile do {:poison, "~> 3.0", override: true}, {:tesla, github: "alex-strizhakov/tesla", - ref: "beb8927358dfaa66ecd458df607befde12dd56e0", + ref: "c29a7fd030fa6decbf7091152f563fe322e2b589", override: true}, {:cowlib, "~> 2.6.0", override: true}, {:gun, "~> 1.3"}, @@ -85,7 +85,7 @@ "swoosh": {:hex, :swoosh, "0.23.2", "7dda95ff0bf54a2298328d6899c74dae1223777b43563ccebebb4b5d2b61df38", [:mix], [{:cowboy, "~> 1.0.1 or ~> 1.1 or ~> 2.4", [hex: :cowboy, repo: "hexpm", optional: true]}, {:gen_smtp, "~> 0.13", [hex: :gen_smtp, repo: "hexpm", optional: true]}, {:hackney, "~> 1.9", [hex: :hackney, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}, {:mail, "~> 0.2", [hex: :mail, repo: "hexpm", optional: true]}, {:mime, "~> 1.1", [hex: :mime, repo: "hexpm", optional: false]}, {:plug_cowboy, ">= 1.0.0", [hex: :plug_cowboy, repo: "hexpm", optional: true]}], "hexpm"}, "syslog": {:git, "https://github.com/Vagabond/erlang-syslog.git", "4a6c6f2c996483e86c1320e9553f91d337bcb6aa", [tag: "1.0.5"]}, "telemetry": {:hex, :telemetry, "0.4.0", "8339bee3fa8b91cb84d14c2935f8ecf399ccd87301ad6da6b71c09553834b2ab", [:rebar3], [], "hexpm"}, - "tesla": {:git, "https://github.com/alex-strizhakov/tesla.git", "beb8927358dfaa66ecd458df607befde12dd56e0", [ref: "beb8927358dfaa66ecd458df607befde12dd56e0"]}, + "tesla": {:git, "https://github.com/alex-strizhakov/tesla.git", "c29a7fd030fa6decbf7091152f563fe322e2b589", [ref: "c29a7fd030fa6decbf7091152f563fe322e2b589"]}, "timex": {:hex, :timex, "3.6.1", "efdf56d0e67a6b956cc57774353b0329c8ab7726766a11547e529357ffdc1d56", [:mix], [{:combine, "~> 0.10", [hex: :combine, repo: "hexpm", optional: false]}, {:gettext, "~> 0.10", [hex: :gettext, repo: "hexpm", optional: false]}, {:tzdata, "~> 0.1.8 or ~> 0.5 or ~> 1.0.0", [hex: :tzdata, repo: "hexpm", optional: false]}], "hexpm"}, "trailing_format_plug": {:hex, :trailing_format_plug, "0.0.7", "64b877f912cf7273bed03379936df39894149e35137ac9509117e59866e10e45", [:mix], [{:plug, "> 0.12.0", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm"}, "tzdata": {:hex, :tzdata, "0.5.21", "8cbf3607fcce69636c672d5be2bbb08687fe26639a62bdcc283d267277db7cf0", [:mix], [{:hackney, "~> 1.0", [hex: :hackney, repo: "hexpm", optional: false]}], "hexpm"}, diff --git a/test/gun/connections_test.exs b/test/gun/connections_test.exs new file mode 100644 index 000000000..2ec8f3993 --- /dev/null +++ b/test/gun/connections_test.exs @@ -0,0 +1,172 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Gun.ConnectionsTest do + use ExUnit.Case, async: true + alias Pleroma.Gun.{Connections, Conn, API} + + setup do + name = :test_gun_connections + {:ok, pid} = Connections.start_link(name) + + {:ok, name: name, pid: pid} + end + + test "opens connection and reuse it on next request", %{name: name, pid: pid} do + conn = Connections.get_conn(name, "http://some-domain.com", genserver_pid: pid) + + assert is_pid(conn) + assert Process.alive?(conn) + + reused_conn = Connections.get_conn(name, "http://some-domain.com", genserver_pid: pid) + + assert conn == reused_conn + + %Connections{ + conns: %{ + "some-domain.com:80" => %Conn{ + conn: ^conn, + state: :up, + waiting_pids: [] + } + } + } = Connections.get_state(name) + end + + test "reuses connection based on protocol", %{name: name, pid: pid} do + conn = Connections.get_conn(name, "http://some-domain.com", genserver_pid: pid) + assert is_pid(conn) + assert Process.alive?(conn) + + https_conn = Connections.get_conn(name, "https://some-domain.com", genserver_pid: pid) + + refute conn == https_conn + + reused_https = Connections.get_conn(name, "https://some-domain.com", genserver_pid: pid) + + refute conn == reused_https + + assert reused_https == https_conn + + %Connections{ + conns: %{ + "some-domain.com:80" => %Conn{ + conn: ^conn, + state: :up, + waiting_pids: [] + }, + "some-domain.com:443" => %Conn{ + conn: ^https_conn, + state: :up, + waiting_pids: [] + } + } + } = Connections.get_state(name) + end + + test "process gun_down message", %{name: name, pid: pid} do + conn = Connections.get_conn(name, "http://gun_down.com", genserver_pid: pid) + + refute conn + + %Connections{ + conns: %{ + "gun_down.com:80" => %Conn{ + conn: _, + state: :down, + waiting_pids: _ + } + } + } = Connections.get_state(name) + end + + test "process gun_down message and then gun_up", %{name: name, pid: pid} do + conn = Connections.get_conn(name, "http://gun_down_and_up.com", genserver_pid: pid) + + refute conn + + %Connections{ + conns: %{ + "gun_down_and_up.com:80" => %Conn{ + conn: _, + state: :down, + waiting_pids: _ + } + } + } = Connections.get_state(name) + + conn = Connections.get_conn(name, "http://gun_down_and_up.com", genserver_pid: pid) + + assert is_pid(conn) + assert Process.alive?(conn) + + %Connections{ + conns: %{ + "gun_down_and_up.com:80" => %Conn{ + conn: _, + state: :up, + waiting_pids: [] + } + } + } = Connections.get_state(name) + end + + test "async processes get same conn for same domain", %{name: name, pid: pid} do + tasks = + for _ <- 1..5 do + Task.async(fn -> + Connections.get_conn(name, "http://some-domain.com", genserver_pid: pid) + end) + end + + tasks_with_results = Task.yield_many(tasks) + + results = + Enum.map(tasks_with_results, fn {task, res} -> + res || Task.shutdown(task, :brutal_kill) + end) + + conns = for {:ok, value} <- results, do: value + + %Connections{ + conns: %{ + "some-domain.com:80" => %Conn{ + conn: conn, + state: :up, + waiting_pids: [] + } + } + } = Connections.get_state(name) + + assert Enum.all?(conns, fn res -> res == conn end) + end + + describe "integration test" do + @describetag :integration + + test "opens connection and reuse it on next request", %{name: name} do + api = Pleroma.Config.get([API]) + Pleroma.Config.put([API], :gun) + on_exit(fn -> Pleroma.Config.put([API], api) end) + conn = Connections.get_conn(name, "http://httpbin.org") + + assert is_pid(conn) + assert Process.alive?(conn) + + reused_conn = Connections.get_conn(name, "http://httpbin.org") + + assert conn == reused_conn + + %Connections{ + conns: %{ + "httpbin.org:80" => %Conn{ + conn: ^conn, + state: :up, + waiting_pids: [] + } + } + } = Connections.get_state(name) + end + end +end diff --git a/test/reverse_proxy/reverse_proxy_test.exs b/test/reverse_proxy/reverse_proxy_test.exs index 95adae666..56ac8672b 100644 --- a/test/reverse_proxy/reverse_proxy_test.exs +++ b/test/reverse_proxy/reverse_proxy_test.exs @@ -332,5 +332,38 @@ defmodule Pleroma.ReverseProxyTest do describe "integration tests" do @describetag :integration + + test "with hackney client", %{conn: conn} do + client = Pleroma.Config.get([Pleroma.ReverseProxy.Client]) + Pleroma.Config.put([Pleroma.ReverseProxy.Client], Pleroma.ReverseProxy.Client.Hackney) + + on_exit(fn -> + Pleroma.Config.put([Pleroma.ReverseProxy.Client], client) + end) + + conn = ReverseProxy.call(conn, "http://httpbin.org/stream-bytes/10") + + assert byte_size(conn.resp_body) == 10 + assert conn.state == :chunked + assert conn.status == 200 + end + + test "with tesla client with gun adapter", %{conn: conn} do + client = Pleroma.Config.get([Pleroma.ReverseProxy.Client]) + Pleroma.Config.put([Pleroma.ReverseProxy.Client], Pleroma.ReverseProxy.Client.Tesla) + adapter = Application.get_env(:tesla, :adapter) + Application.put_env(:tesla, :adapter, Tesla.Adapter.Gun) + + conn = ReverseProxy.call(conn, "http://httpbin.org/stream-bytes/10") + + assert byte_size(conn.resp_body) == 10 + assert conn.state == :chunked + assert conn.status == 200 + + on_exit(fn -> + Pleroma.Config.put([Pleroma.ReverseProxy.Client], client) + Application.put_env(:tesla, :adapter, adapter) + end) + end end end |