aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex S <alex.strizhakov@gmail.com>2019-08-13 14:37:19 +0300
committerAlex S <alex.strizhakov@gmail.com>2019-08-20 12:39:53 +0300
commit2caf9ad95424b0a4f47c1e22ce9d57a29fcf9fbb (patch)
tree9ffd3a5ac15c0f1e27dcb174a3448eb5944f7cd7
parentc51aa48e60103307307c7c40c2d046719def7054 (diff)
downloadpleroma-2caf9ad95424b0a4f47c1e22ce9d57a29fcf9fbb.tar.gz
added gun connections genserver
-rw-r--r--config/test.exs2
-rw-r--r--lib/pleroma/gun/api/api.ex15
-rw-r--r--lib/pleroma/gun/api/mock.ex40
-rw-r--r--lib/pleroma/gun/conn.ex17
-rw-r--r--lib/pleroma/gun/connections.ex104
-rw-r--r--lib/pleroma/http/connection.ex1
-rw-r--r--lib/pleroma/reverse_proxy/client.ex2
-rw-r--r--lib/pleroma/reverse_proxy/client/tesla.ex10
-rw-r--r--mix.exs2
-rw-r--r--mix.lock2
-rw-r--r--test/gun/connections_test.exs172
-rw-r--r--test/reverse_proxy/reverse_proxy_test.exs33
12 files changed, 393 insertions, 7 deletions
diff --git a/config/test.exs b/config/test.exs
index 6f75f39b5..ca916d59e 100644
--- a/config/test.exs
+++ b/config/test.exs
@@ -85,6 +85,8 @@ config :joken, default_signer: "yU8uHKq+yyAkZ11Hx//jcdacWc8yQ1bxAAGrplzB0Zwwjkp3
config :pleroma, Pleroma.ReverseProxy.Client, Pleroma.ReverseProxy.ClientMock
+config :pleroma, Pleroma.Gun.API, Pleroma.Gun.API.Mock
+
try do
import_config "test.secret.exs"
rescue
diff --git a/lib/pleroma/gun/api/api.ex b/lib/pleroma/gun/api/api.ex
new file mode 100644
index 000000000..c69ab890e
--- /dev/null
+++ b/lib/pleroma/gun/api/api.ex
@@ -0,0 +1,15 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Gun.API do
+ @callback open(charlist(), pos_integer(), map()) :: {:ok, pid()}
+
+ def open(host, port, opts) do
+ api().open(host, port, opts)
+ end
+
+ defp api do
+ Pleroma.Config.get([Pleroma.Gun.API], :gun)
+ end
+end
diff --git a/lib/pleroma/gun/api/mock.ex b/lib/pleroma/gun/api/mock.ex
new file mode 100644
index 000000000..3348715c4
--- /dev/null
+++ b/lib/pleroma/gun/api/mock.ex
@@ -0,0 +1,40 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Gun.API.Mock do
+ @behaviour Pleroma.Gun.API
+ @impl Pleroma.Gun.API
+ def open('some-domain.com', 80, %{genserver_pid: genserver_pid}) do
+ {:ok, conn_pid} = Task.start_link(fn -> Process.sleep(1_000) end)
+ send(genserver_pid, {:gun_up, conn_pid, :http})
+ {:ok, conn_pid}
+ end
+
+ def open('some-domain.com', 443, %{genserver_pid: genserver_pid}) do
+ {:ok, conn_pid} = Task.start_link(fn -> Process.sleep(1_000) end)
+ send(genserver_pid, {:gun_up, conn_pid, :https})
+ {:ok, conn_pid}
+ end
+
+ @impl Pleroma.Gun.API
+ def open('gun_down.com', _port, %{genserver_pid: genserver_pid}) do
+ {:ok, conn_pid} = Task.start_link(fn -> Process.sleep(1_000) end)
+ send(genserver_pid, {:gun_down, conn_pid, :http, nil, nil, nil})
+ {:ok, conn_pid}
+ end
+
+ @impl Pleroma.Gun.API
+ def open('gun_down_and_up.com', _port, %{genserver_pid: genserver_pid}) do
+ {:ok, conn_pid} = Task.start_link(fn -> Process.sleep(1_000) end)
+ send(genserver_pid, {:gun_down, conn_pid, :http, nil, nil, nil})
+
+ {:ok, _} =
+ Task.start_link(fn ->
+ Process.sleep(500)
+ send(genserver_pid, {:gun_up, conn_pid, :http})
+ end)
+
+ {:ok, conn_pid}
+ end
+end
diff --git a/lib/pleroma/gun/conn.ex b/lib/pleroma/gun/conn.ex
new file mode 100644
index 000000000..62ef146a1
--- /dev/null
+++ b/lib/pleroma/gun/conn.ex
@@ -0,0 +1,17 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Gun.Conn do
+ @moduledoc """
+ Struct for gun connection data
+ """
+ @type t :: %__MODULE__{
+ conn: pid(),
+ state: atom(),
+ waiting_pids: [pid()],
+ protocol: atom()
+ }
+
+ defstruct conn: nil, state: :open, waiting_pids: [], protocol: :http
+end
diff --git a/lib/pleroma/gun/connections.ex b/lib/pleroma/gun/connections.ex
new file mode 100644
index 000000000..60ec68d89
--- /dev/null
+++ b/lib/pleroma/gun/connections.ex
@@ -0,0 +1,104 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Gun.Connections do
+ use GenServer
+
+ @type domain :: String.t()
+ @type conn :: Gun.Conn.t()
+ @type t :: %__MODULE__{
+ conns: %{domain() => conn()}
+ }
+
+ defstruct conns: %{}
+
+ def start_link(name \\ __MODULE__) do
+ if Application.get_env(:tesla, :adapter) == Tesla.Adapter.Gun do
+ GenServer.start_link(__MODULE__, [], name: name)
+ else
+ :ignore
+ end
+ end
+
+ @impl true
+ def init(_) do
+ {:ok, %__MODULE__{conns: %{}}}
+ end
+
+ @spec get_conn(atom(), String.t(), keyword()) :: pid()
+ def get_conn(name \\ __MODULE__, url, opts \\ []) do
+ opts = Enum.into(opts, %{})
+ uri = URI.parse(url)
+
+ opts = if uri.scheme == "https", do: Map.put(opts, :transport, :tls), else: opts
+
+ GenServer.call(
+ name,
+ {:conn, %{opts: opts, uri: uri}}
+ )
+ end
+
+ @spec get_state(atom()) :: t()
+ def get_state(name \\ __MODULE__) do
+ GenServer.call(name, {:state})
+ end
+
+ @impl true
+ def handle_call({:conn, %{opts: opts, uri: uri}}, from, state) do
+ key = compose_key(uri)
+
+ case state.conns[key] do
+ %{conn: conn, state: conn_state} when conn_state == :up ->
+ {:reply, conn, state}
+
+ %{state: conn_state, waiting_pids: pids} when conn_state in [:open, :down] ->
+ state = put_in(state.conns[key].waiting_pids, [from | pids])
+ {:noreply, state}
+
+ nil ->
+ {:ok, conn} = Pleroma.Gun.API.open(to_charlist(uri.host), uri.port, opts)
+
+ state =
+ put_in(state.conns[key], %Pleroma.Gun.Conn{
+ conn: conn,
+ waiting_pids: [from],
+ protocol: String.to_atom(uri.scheme)
+ })
+
+ {:noreply, state}
+ end
+ end
+
+ @impl true
+ def handle_call({:state}, _from, state), do: {:reply, state, state}
+
+ @impl true
+ def handle_info({:gun_up, conn_pid, protocol}, state) do
+ {key, conn} = find_conn(state.conns, conn_pid, protocol)
+
+ # Send to all waiting processes connection pid
+ Enum.each(conn.waiting_pids, fn waiting_pid -> GenServer.reply(waiting_pid, conn_pid) end)
+
+ # Update state of the current connection and set waiting_pids to empty list
+ state = put_in(state.conns[key], %{conn | state: :up, waiting_pids: []})
+ {:noreply, state}
+ end
+
+ @impl true
+ # Do we need to do something with killed & unprocessed references?
+ def handle_info({:gun_down, conn_pid, protocol, _reason, _killed, _unprocessed}, state) do
+ {key, conn} = find_conn(state.conns, conn_pid, protocol)
+
+ # We don't want to block requests to GenServer if gun send down message, return nil, so we can make some retries, while connection is not up
+ Enum.each(conn.waiting_pids, fn waiting_pid -> GenServer.reply(waiting_pid, nil) end)
+
+ state = put_in(state.conns[key].state, :down)
+ {:noreply, state}
+ end
+
+ defp compose_key(uri), do: uri.host <> ":" <> to_string(uri.port)
+
+ defp find_conn(conns, conn_pid, protocol),
+ do: Enum.find(conns, fn {_, conn} -> conn.conn == conn_pid and conn.protocol == protocol end)
+end
diff --git a/lib/pleroma/http/connection.ex b/lib/pleroma/http/connection.ex
index ff03a3ee3..6cb26c0fe 100644
--- a/lib/pleroma/http/connection.ex
+++ b/lib/pleroma/http/connection.ex
@@ -9,7 +9,6 @@ defmodule Pleroma.HTTP.Connection do
@options [
connect_timeout: 10_000,
- protocols: [:http],
timeout: 20_000
]
diff --git a/lib/pleroma/reverse_proxy/client.ex b/lib/pleroma/reverse_proxy/client.ex
index 42f2ff13b..71c2b2911 100644
--- a/lib/pleroma/reverse_proxy/client.ex
+++ b/lib/pleroma/reverse_proxy/client.ex
@@ -28,6 +28,6 @@ defmodule Pleroma.ReverseProxy.Client do
def close(ref), do: client().close(ref)
defp client do
- Pleroma.Config.get([Pleroma.ReverseProxy.Client], Pleroma.ReverseProxy.Client.Hackney)
+ Pleroma.Config.get([Pleroma.ReverseProxy.Client], Pleroma.ReverseProxy.Client.Tesla)
end
end
diff --git a/lib/pleroma/reverse_proxy/client/tesla.ex b/lib/pleroma/reverse_proxy/client/tesla.ex
index b1498a5a4..fad577ec1 100644
--- a/lib/pleroma/reverse_proxy/client/tesla.ex
+++ b/lib/pleroma/reverse_proxy/client/tesla.ex
@@ -1,3 +1,7 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-onl
+
defmodule Pleroma.ReverseProxy.Client.Tesla do
@behaviour Pleroma.ReverseProxy.Client
@@ -6,7 +10,7 @@ defmodule Pleroma.ReverseProxy.Client.Tesla do
def request(method, url, headers, body, opts \\ []) do
adapter_opts =
Keyword.get(opts, :adapter, [])
- |> Keyword.put(:chunks_response, true)
+ |> Keyword.put(:body_as, :chunks)
with {:ok, response} <-
Pleroma.HTTP.request(
@@ -44,13 +48,13 @@ defmodule Pleroma.ReverseProxy.Client.Tesla do
adapter.read_chunk(pid, stream, opts)
end
- def close(client) do
+ def close(pid) do
adapter = Application.get_env(:tesla, :adapter)
unless adapter in @adapters do
raise "#{adapter} doesn't support closing connection"
end
- adapter.close(client)
+ adapter.close(pid)
end
end
diff --git a/mix.exs b/mix.exs
index f1fbdc6b3..9d04b5771 100644
--- a/mix.exs
+++ b/mix.exs
@@ -113,7 +113,7 @@ defmodule Pleroma.Mixfile do
{:poison, "~> 3.0", override: true},
{:tesla,
github: "alex-strizhakov/tesla",
- ref: "beb8927358dfaa66ecd458df607befde12dd56e0",
+ ref: "c29a7fd030fa6decbf7091152f563fe322e2b589",
override: true},
{:cowlib, "~> 2.6.0", override: true},
{:gun, "~> 1.3"},
diff --git a/mix.lock b/mix.lock
index 635f20185..ef5ebda6c 100644
--- a/mix.lock
+++ b/mix.lock
@@ -85,7 +85,7 @@
"swoosh": {:hex, :swoosh, "0.23.2", "7dda95ff0bf54a2298328d6899c74dae1223777b43563ccebebb4b5d2b61df38", [:mix], [{:cowboy, "~> 1.0.1 or ~> 1.1 or ~> 2.4", [hex: :cowboy, repo: "hexpm", optional: true]}, {:gen_smtp, "~> 0.13", [hex: :gen_smtp, repo: "hexpm", optional: true]}, {:hackney, "~> 1.9", [hex: :hackney, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}, {:mail, "~> 0.2", [hex: :mail, repo: "hexpm", optional: true]}, {:mime, "~> 1.1", [hex: :mime, repo: "hexpm", optional: false]}, {:plug_cowboy, ">= 1.0.0", [hex: :plug_cowboy, repo: "hexpm", optional: true]}], "hexpm"},
"syslog": {:git, "https://github.com/Vagabond/erlang-syslog.git", "4a6c6f2c996483e86c1320e9553f91d337bcb6aa", [tag: "1.0.5"]},
"telemetry": {:hex, :telemetry, "0.4.0", "8339bee3fa8b91cb84d14c2935f8ecf399ccd87301ad6da6b71c09553834b2ab", [:rebar3], [], "hexpm"},
- "tesla": {:git, "https://github.com/alex-strizhakov/tesla.git", "beb8927358dfaa66ecd458df607befde12dd56e0", [ref: "beb8927358dfaa66ecd458df607befde12dd56e0"]},
+ "tesla": {:git, "https://github.com/alex-strizhakov/tesla.git", "c29a7fd030fa6decbf7091152f563fe322e2b589", [ref: "c29a7fd030fa6decbf7091152f563fe322e2b589"]},
"timex": {:hex, :timex, "3.6.1", "efdf56d0e67a6b956cc57774353b0329c8ab7726766a11547e529357ffdc1d56", [:mix], [{:combine, "~> 0.10", [hex: :combine, repo: "hexpm", optional: false]}, {:gettext, "~> 0.10", [hex: :gettext, repo: "hexpm", optional: false]}, {:tzdata, "~> 0.1.8 or ~> 0.5 or ~> 1.0.0", [hex: :tzdata, repo: "hexpm", optional: false]}], "hexpm"},
"trailing_format_plug": {:hex, :trailing_format_plug, "0.0.7", "64b877f912cf7273bed03379936df39894149e35137ac9509117e59866e10e45", [:mix], [{:plug, "> 0.12.0", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm"},
"tzdata": {:hex, :tzdata, "0.5.21", "8cbf3607fcce69636c672d5be2bbb08687fe26639a62bdcc283d267277db7cf0", [:mix], [{:hackney, "~> 1.0", [hex: :hackney, repo: "hexpm", optional: false]}], "hexpm"},
diff --git a/test/gun/connections_test.exs b/test/gun/connections_test.exs
new file mode 100644
index 000000000..2ec8f3993
--- /dev/null
+++ b/test/gun/connections_test.exs
@@ -0,0 +1,172 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Gun.ConnectionsTest do
+ use ExUnit.Case, async: true
+ alias Pleroma.Gun.{Connections, Conn, API}
+
+ setup do
+ name = :test_gun_connections
+ {:ok, pid} = Connections.start_link(name)
+
+ {:ok, name: name, pid: pid}
+ end
+
+ test "opens connection and reuse it on next request", %{name: name, pid: pid} do
+ conn = Connections.get_conn(name, "http://some-domain.com", genserver_pid: pid)
+
+ assert is_pid(conn)
+ assert Process.alive?(conn)
+
+ reused_conn = Connections.get_conn(name, "http://some-domain.com", genserver_pid: pid)
+
+ assert conn == reused_conn
+
+ %Connections{
+ conns: %{
+ "some-domain.com:80" => %Conn{
+ conn: ^conn,
+ state: :up,
+ waiting_pids: []
+ }
+ }
+ } = Connections.get_state(name)
+ end
+
+ test "reuses connection based on protocol", %{name: name, pid: pid} do
+ conn = Connections.get_conn(name, "http://some-domain.com", genserver_pid: pid)
+ assert is_pid(conn)
+ assert Process.alive?(conn)
+
+ https_conn = Connections.get_conn(name, "https://some-domain.com", genserver_pid: pid)
+
+ refute conn == https_conn
+
+ reused_https = Connections.get_conn(name, "https://some-domain.com", genserver_pid: pid)
+
+ refute conn == reused_https
+
+ assert reused_https == https_conn
+
+ %Connections{
+ conns: %{
+ "some-domain.com:80" => %Conn{
+ conn: ^conn,
+ state: :up,
+ waiting_pids: []
+ },
+ "some-domain.com:443" => %Conn{
+ conn: ^https_conn,
+ state: :up,
+ waiting_pids: []
+ }
+ }
+ } = Connections.get_state(name)
+ end
+
+ test "process gun_down message", %{name: name, pid: pid} do
+ conn = Connections.get_conn(name, "http://gun_down.com", genserver_pid: pid)
+
+ refute conn
+
+ %Connections{
+ conns: %{
+ "gun_down.com:80" => %Conn{
+ conn: _,
+ state: :down,
+ waiting_pids: _
+ }
+ }
+ } = Connections.get_state(name)
+ end
+
+ test "process gun_down message and then gun_up", %{name: name, pid: pid} do
+ conn = Connections.get_conn(name, "http://gun_down_and_up.com", genserver_pid: pid)
+
+ refute conn
+
+ %Connections{
+ conns: %{
+ "gun_down_and_up.com:80" => %Conn{
+ conn: _,
+ state: :down,
+ waiting_pids: _
+ }
+ }
+ } = Connections.get_state(name)
+
+ conn = Connections.get_conn(name, "http://gun_down_and_up.com", genserver_pid: pid)
+
+ assert is_pid(conn)
+ assert Process.alive?(conn)
+
+ %Connections{
+ conns: %{
+ "gun_down_and_up.com:80" => %Conn{
+ conn: _,
+ state: :up,
+ waiting_pids: []
+ }
+ }
+ } = Connections.get_state(name)
+ end
+
+ test "async processes get same conn for same domain", %{name: name, pid: pid} do
+ tasks =
+ for _ <- 1..5 do
+ Task.async(fn ->
+ Connections.get_conn(name, "http://some-domain.com", genserver_pid: pid)
+ end)
+ end
+
+ tasks_with_results = Task.yield_many(tasks)
+
+ results =
+ Enum.map(tasks_with_results, fn {task, res} ->
+ res || Task.shutdown(task, :brutal_kill)
+ end)
+
+ conns = for {:ok, value} <- results, do: value
+
+ %Connections{
+ conns: %{
+ "some-domain.com:80" => %Conn{
+ conn: conn,
+ state: :up,
+ waiting_pids: []
+ }
+ }
+ } = Connections.get_state(name)
+
+ assert Enum.all?(conns, fn res -> res == conn end)
+ end
+
+ describe "integration test" do
+ @describetag :integration
+
+ test "opens connection and reuse it on next request", %{name: name} do
+ api = Pleroma.Config.get([API])
+ Pleroma.Config.put([API], :gun)
+ on_exit(fn -> Pleroma.Config.put([API], api) end)
+ conn = Connections.get_conn(name, "http://httpbin.org")
+
+ assert is_pid(conn)
+ assert Process.alive?(conn)
+
+ reused_conn = Connections.get_conn(name, "http://httpbin.org")
+
+ assert conn == reused_conn
+
+ %Connections{
+ conns: %{
+ "httpbin.org:80" => %Conn{
+ conn: ^conn,
+ state: :up,
+ waiting_pids: []
+ }
+ }
+ } = Connections.get_state(name)
+ end
+ end
+end
diff --git a/test/reverse_proxy/reverse_proxy_test.exs b/test/reverse_proxy/reverse_proxy_test.exs
index 95adae666..56ac8672b 100644
--- a/test/reverse_proxy/reverse_proxy_test.exs
+++ b/test/reverse_proxy/reverse_proxy_test.exs
@@ -332,5 +332,38 @@ defmodule Pleroma.ReverseProxyTest do
describe "integration tests" do
@describetag :integration
+
+ test "with hackney client", %{conn: conn} do
+ client = Pleroma.Config.get([Pleroma.ReverseProxy.Client])
+ Pleroma.Config.put([Pleroma.ReverseProxy.Client], Pleroma.ReverseProxy.Client.Hackney)
+
+ on_exit(fn ->
+ Pleroma.Config.put([Pleroma.ReverseProxy.Client], client)
+ end)
+
+ conn = ReverseProxy.call(conn, "http://httpbin.org/stream-bytes/10")
+
+ assert byte_size(conn.resp_body) == 10
+ assert conn.state == :chunked
+ assert conn.status == 200
+ end
+
+ test "with tesla client with gun adapter", %{conn: conn} do
+ client = Pleroma.Config.get([Pleroma.ReverseProxy.Client])
+ Pleroma.Config.put([Pleroma.ReverseProxy.Client], Pleroma.ReverseProxy.Client.Tesla)
+ adapter = Application.get_env(:tesla, :adapter)
+ Application.put_env(:tesla, :adapter, Tesla.Adapter.Gun)
+
+ conn = ReverseProxy.call(conn, "http://httpbin.org/stream-bytes/10")
+
+ assert byte_size(conn.resp_body) == 10
+ assert conn.state == :chunked
+ assert conn.status == 200
+
+ on_exit(fn ->
+ Pleroma.Config.put([Pleroma.ReverseProxy.Client], client)
+ Application.put_env(:tesla, :adapter, adapter)
+ end)
+ end
end
end