aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex S <alex.strizhakov@gmail.com>2019-08-19 15:40:06 +0300
committerAlex S <alex.strizhakov@gmail.com>2019-08-20 12:42:53 +0300
commit814159e668d48ace95d8feac8847e1d68d07810b (patch)
treefb8d1e5ff44c32ef9ce85b4589ef6e188c4243ce
parent210e116dc0ee48183bf7b7a363c6d053f88d30fe (diff)
downloadpleroma-814159e668d48ace95d8feac8847e1d68d07810b.tar.gz
expanding gun connections
closing least frequently used separate pools with settings
-rw-r--r--config/config.exs18
-rw-r--r--lib/pleroma/application.ex14
-rw-r--r--lib/pleroma/gun/api/api.ex10
-rw-r--r--lib/pleroma/gun/api/gun.ex6
-rw-r--r--lib/pleroma/gun/api/mock.ex45
-rw-r--r--lib/pleroma/gun/conn.ex4
-rw-r--r--lib/pleroma/gun/connections.ex89
-rw-r--r--test/gun/connections_test.exs94
8 files changed, 230 insertions, 50 deletions
diff --git a/config/config.exs b/config/config.exs
index 63162d594..e56b9730d 100644
--- a/config/config.exs
+++ b/config/config.exs
@@ -550,6 +550,24 @@ config :pleroma, :rate_limit,
password_reset: {1_800_000, 5},
account_confirmation_resend: {8_640_000, 5}
+config :pleroma, :gun_pools,
+ federation: [
+ max_connections: 50,
+ timeout: 150_000
+ ],
+ media: [
+ max_connections: 50,
+ timeout: 150_000
+ ],
+ upload: [
+ max_connections: 25,
+ timeout: 300_000
+ ],
+ default: [
+ max_connections: 10,
+ timout: 20_000
+ ]
+
# Import environment specific config. This must remain at the bottom
# of this file so it overrides the configuration defined above.
import_config "#{Mix.env()}.exs"
diff --git a/lib/pleroma/application.ex b/lib/pleroma/application.ex
index 25e56b9e2..06d1a187e 100644
--- a/lib/pleroma/application.ex
+++ b/lib/pleroma/application.ex
@@ -39,6 +39,7 @@ defmodule Pleroma.Application do
] ++
cachex_children() ++
hackney_pool_children() ++
+ gun_pools() ++
[
Pleroma.Web.Federator.RetryQueue,
Pleroma.Stats,
@@ -163,6 +164,19 @@ defmodule Pleroma.Application do
end
end
+ defp gun_pools do
+ if Application.get_env(:tesla, :adapter) == Tesla.Adapter.Gun do
+ for {pool_name, opts} <- Pleroma.Config.get([:gun_pools]) do
+ %{
+ id: :"gun_pool_#{pool_name}",
+ start: {Pleroma.Gun.Connections, :start_link, [{pool_name, opts}]}
+ }
+ end
+ else
+ []
+ end
+ end
+
defp after_supervisor_start do
with digest_config <- Application.get_env(:pleroma, :email_notifications)[:digest],
true <- digest_config[:active] do
diff --git a/lib/pleroma/gun/api/api.ex b/lib/pleroma/gun/api/api.ex
index 19adc1bf0..7e6d2f929 100644
--- a/lib/pleroma/gun/api/api.ex
+++ b/lib/pleroma/gun/api/api.ex
@@ -4,11 +4,21 @@
defmodule Pleroma.Gun.API do
@callback open(charlist(), pos_integer(), map()) :: {:ok, pid()}
+ @callback info(pid()) :: map()
+ @callback close(pid()) :: :ok
def open(host, port, opts) do
api().open(host, port, opts)
end
+ def info(pid) do
+ api().info(pid)
+ end
+
+ def close(pid) do
+ api().close(pid)
+ end
+
defp api do
Pleroma.Config.get([Pleroma.Gun.API], Pleroma.Gun.API.Gun)
end
diff --git a/lib/pleroma/gun/api/gun.ex b/lib/pleroma/gun/api/gun.ex
index 14a4b7275..33e7985a1 100644
--- a/lib/pleroma/gun/api/gun.ex
+++ b/lib/pleroma/gun/api/gun.ex
@@ -19,4 +19,10 @@ defmodule Pleroma.Gun.API.Gun do
def open(host, port, opts) do
:gun.open(host, port, Map.take(opts, @gun_keys))
end
+
+ @impl Pleroma.Gun.API
+ def info(pid), do: :gun.info(pid)
+
+ @impl Pleroma.Gun.API
+ def close(pid), do: :gun.close(pid)
end
diff --git a/lib/pleroma/gun/api/mock.ex b/lib/pleroma/gun/api/mock.ex
index ff9e13a74..a80559f0b 100644
--- a/lib/pleroma/gun/api/mock.ex
+++ b/lib/pleroma/gun/api/mock.ex
@@ -5,36 +5,75 @@
defmodule Pleroma.Gun.API.Mock do
@behaviour Pleroma.Gun.API
@impl Pleroma.Gun.API
- def open('some-domain.com', 80, %{genserver_pid: genserver_pid}) do
+ def open(domain, 80, %{genserver_pid: genserver_pid})
+ when domain in ['another-domain.com', 'some-domain.com'] do
{:ok, conn_pid} = Task.start_link(fn -> Process.sleep(1_000) end)
+
+ Registry.register(Pleroma.Gun.API.Mock, conn_pid, %{
+ origin_scheme: "http",
+ origin_host: domain,
+ origin_port: 80
+ })
+
send(genserver_pid, {:gun_up, conn_pid, :http})
{:ok, conn_pid}
end
def open('some-domain.com', 443, %{genserver_pid: genserver_pid}) do
{:ok, conn_pid} = Task.start_link(fn -> Process.sleep(1_000) end)
+
+ Registry.register(Pleroma.Gun.API.Mock, conn_pid, %{
+ origin_scheme: "https",
+ origin_host: 'some-domain.com',
+ origin_port: 443
+ })
+
send(genserver_pid, {:gun_up, conn_pid, :http2})
{:ok, conn_pid}
end
@impl Pleroma.Gun.API
- def open('gun_down.com', _port, %{genserver_pid: genserver_pid}) do
+ def open('gun_down.com', 80, %{genserver_pid: genserver_pid}) do
{:ok, conn_pid} = Task.start_link(fn -> Process.sleep(1_000) end)
+
+ Registry.register(Pleroma.Gun.API.Mock, conn_pid, %{
+ origin_scheme: "http",
+ origin_host: 'gun_down.com',
+ origin_port: 80
+ })
+
send(genserver_pid, {:gun_down, conn_pid, :http, nil, nil, nil})
{:ok, conn_pid}
end
@impl Pleroma.Gun.API
- def open('gun_down_and_up.com', _port, %{genserver_pid: genserver_pid}) do
+ def open('gun_down_and_up.com', 80, %{genserver_pid: genserver_pid}) do
{:ok, conn_pid} = Task.start_link(fn -> Process.sleep(1_000) end)
+
+ Registry.register(Pleroma.Gun.API.Mock, conn_pid, %{
+ origin_scheme: "http",
+ origin_host: 'gun_down_and_up.com',
+ origin_port: 80
+ })
+
send(genserver_pid, {:gun_down, conn_pid, :http, nil, nil, nil})
{:ok, _} =
Task.start_link(fn ->
Process.sleep(500)
+
send(genserver_pid, {:gun_up, conn_pid, :http})
end)
{:ok, conn_pid}
end
+
+ @impl Pleroma.Gun.API
+ def info(pid) do
+ [{_, info}] = Registry.lookup(Pleroma.Gun.API.Mock, pid)
+ info
+ end
+
+ @impl Pleroma.Gun.API
+ def close(_pid), do: :ok
end
diff --git a/lib/pleroma/gun/conn.ex b/lib/pleroma/gun/conn.ex
index 62ef146a1..20ddec64c 100644
--- a/lib/pleroma/gun/conn.ex
+++ b/lib/pleroma/gun/conn.ex
@@ -10,8 +10,8 @@ defmodule Pleroma.Gun.Conn do
conn: pid(),
state: atom(),
waiting_pids: [pid()],
- protocol: atom()
+ used: pos_integer()
}
- defstruct conn: nil, state: :open, waiting_pids: [], protocol: :http
+ defstruct conn: nil, state: :open, waiting_pids: [], used: 0
end
diff --git a/lib/pleroma/gun/connections.ex b/lib/pleroma/gun/connections.ex
index a3f1b0351..cec3de2ca 100644
--- a/lib/pleroma/gun/connections.ex
+++ b/lib/pleroma/gun/connections.ex
@@ -6,29 +6,31 @@ defmodule Pleroma.Gun.Connections do
use GenServer
@type domain :: String.t()
- @type conn :: Gun.Conn.t()
+ @type conn :: Pleroma.Gun.Conn.t()
+
@type t :: %__MODULE__{
- conns: %{domain() => conn()}
+ conns: %{domain() => conn()},
+ opts: keyword()
}
- defstruct conns: %{}
+ defstruct conns: %{}, opts: []
- def start_link(name \\ __MODULE__) do
+ @spec start_link({atom(), keyword()}) :: {:ok, pid()} | :ignore
+ def start_link({name, opts}) do
if Application.get_env(:tesla, :adapter) == Tesla.Adapter.Gun do
- GenServer.start_link(__MODULE__, [], name: name)
+ GenServer.start_link(__MODULE__, opts, name: name)
else
:ignore
end
end
@impl true
- def init(_) do
- {:ok, %__MODULE__{conns: %{}}}
- end
+ def init(opts), do: {:ok, %__MODULE__{conns: %{}, opts: opts}}
@spec get_conn(String.t(), keyword(), atom()) :: pid()
- def get_conn(url, opts \\ [], name \\ __MODULE__) do
+ def get_conn(url, opts \\ [], name \\ :default) do
opts = Enum.into(opts, %{})
+
uri = URI.parse(url)
opts =
@@ -58,13 +60,13 @@ defmodule Pleroma.Gun.Connections do
end
@spec alive?(atom()) :: boolean()
- def alive?(name \\ __MODULE__) do
+ def alive?(name \\ :default) do
pid = Process.whereis(name)
if pid, do: Process.alive?(pid), else: false
end
@spec get_state(atom()) :: t()
- def get_state(name \\ __MODULE__) do
+ def get_state(name \\ :default) do
GenServer.call(name, {:state})
end
@@ -73,7 +75,8 @@ defmodule Pleroma.Gun.Connections do
key = compose_key(uri)
case state.conns[key] do
- %{conn: conn, state: conn_state} when conn_state == :up ->
+ %{conn: conn, state: conn_state, used: used} when conn_state == :up ->
+ state = put_in(state.conns[key].used, used + 1)
{:reply, conn, state}
%{state: conn_state, waiting_pids: pids} when conn_state in [:open, :down] ->
@@ -81,16 +84,23 @@ defmodule Pleroma.Gun.Connections do
{:noreply, state}
nil ->
- {:ok, conn} = Pleroma.Gun.API.open(to_charlist(uri.host), uri.port, opts)
+ max_connections = state.opts[:max_connections]
- state =
- put_in(state.conns[key], %Pleroma.Gun.Conn{
- conn: conn,
- waiting_pids: [from],
- protocol: String.to_atom(uri.scheme)
- })
+ if Enum.count(state.conns) < max_connections do
+ open_conn(key, uri, from, state, opts)
+ else
+ [{close_key, least_used} | _conns] = Enum.sort_by(state.conns, fn {_k, v} -> v.used end)
- {:noreply, state}
+ :ok = Pleroma.Gun.API.close(least_used.conn)
+
+ state =
+ put_in(
+ state.conns,
+ Map.delete(state.conns, close_key)
+ )
+
+ open_conn(key, uri, from, state, opts)
+ end
end
end
@@ -99,20 +109,29 @@ defmodule Pleroma.Gun.Connections do
@impl true
def handle_info({:gun_up, conn_pid, _protocol}, state) do
- {key, conn} = find_conn(state.conns, conn_pid)
+ conn_key = compose_key_gun_info(conn_pid)
+ {key, conn} = find_conn(state.conns, conn_pid, conn_key)
# Send to all waiting processes connection pid
Enum.each(conn.waiting_pids, fn waiting_pid -> GenServer.reply(waiting_pid, conn_pid) end)
# Update state of the current connection and set waiting_pids to empty list
- state = put_in(state.conns[key], %{conn | state: :up, waiting_pids: []})
+ state =
+ put_in(state.conns[key], %{
+ conn
+ | state: :up,
+ waiting_pids: [],
+ used: conn.used + length(conn.waiting_pids)
+ })
+
{:noreply, state}
end
@impl true
# Do we need to do something with killed & unprocessed references?
def handle_info({:gun_down, conn_pid, _protocol, _reason, _killed, _unprocessed}, state) do
- {key, conn} = find_conn(state.conns, conn_pid)
+ conn_key = compose_key_gun_info(conn_pid)
+ {key, conn} = find_conn(state.conns, conn_pid, conn_key)
# We don't want to block requests to GenServer if gun send down message, return nil, so we can make some retries, while connection is not up
Enum.each(conn.waiting_pids, fn waiting_pid -> GenServer.reply(waiting_pid, nil) end)
@@ -121,12 +140,28 @@ defmodule Pleroma.Gun.Connections do
{:noreply, state}
end
- defp compose_key(uri), do: uri.host <> ":" <> to_string(uri.port)
+ defp compose_key(uri), do: "#{uri.scheme}:#{uri.host}:#{uri.port}"
- defp find_conn(conns, conn_pid) do
+ defp compose_key_gun_info(pid) do
+ info = Pleroma.Gun.API.info(pid)
+ "#{info.origin_scheme}:#{info.origin_host}:#{info.origin_port}"
+ end
+
+ defp find_conn(conns, conn_pid, conn_key) do
Enum.find(conns, fn {key, conn} ->
- protocol = if String.ends_with?(key, ":443"), do: :https, else: :http
- conn.conn == conn_pid and conn.protocol == protocol
+ key == conn_key and conn.conn == conn_pid
end)
end
+
+ defp open_conn(key, uri, from, state, opts) do
+ {:ok, conn} = Pleroma.Gun.API.open(to_charlist(uri.host), uri.port, opts)
+
+ state =
+ put_in(state.conns[key], %Pleroma.Gun.Conn{
+ conn: conn,
+ waiting_pids: [from]
+ })
+
+ {:noreply, state}
+ end
end
diff --git a/test/gun/connections_test.exs b/test/gun/connections_test.exs
index a63c8eaf9..8308b5f9f 100644
--- a/test/gun/connections_test.exs
+++ b/test/gun/connections_test.exs
@@ -6,12 +6,17 @@ defmodule Gun.ConnectionsTest do
use ExUnit.Case
alias Pleroma.Gun.{Connections, Conn, API}
+ setup_all do
+ {:ok, _} = Registry.start_link(keys: :unique, name: API.Mock)
+ :ok
+ end
+
setup do
name = :test_gun_connections
adapter = Application.get_env(:tesla, :adapter)
Application.put_env(:tesla, :adapter, Tesla.Adapter.Gun)
on_exit(fn -> Application.put_env(:tesla, :adapter, adapter) end)
- {:ok, pid} = Connections.start_link(name)
+ {:ok, pid} = Connections.start_link({name, [max_connections: 2, timeout: 10]})
{:ok, name: name, pid: pid}
end
@@ -37,10 +42,11 @@ defmodule Gun.ConnectionsTest do
%Connections{
conns: %{
- "some-domain.com:80" => %Conn{
+ "http:some-domain.com:80" => %Conn{
conn: ^conn,
state: :up,
- waiting_pids: []
+ waiting_pids: [],
+ used: 2
}
}
} = Connections.get_state(name)
@@ -58,10 +64,11 @@ defmodule Gun.ConnectionsTest do
%Connections{
conns: %{
- "some-domain.com:80" => %Conn{
+ "http:some-domain.com:80" => %Conn{
conn: ^conn,
state: :up,
- waiting_pids: []
+ waiting_pids: [],
+ used: 2
}
}
} = Connections.get_state(name)
@@ -84,12 +91,12 @@ defmodule Gun.ConnectionsTest do
%Connections{
conns: %{
- "some-domain.com:80" => %Conn{
+ "http:some-domain.com:80" => %Conn{
conn: ^conn,
state: :up,
waiting_pids: []
},
- "some-domain.com:443" => %Conn{
+ "https:some-domain.com:443" => %Conn{
conn: ^https_conn,
state: :up,
waiting_pids: []
@@ -105,7 +112,7 @@ defmodule Gun.ConnectionsTest do
%Connections{
conns: %{
- "gun_down.com:80" => %Conn{
+ "http:gun_down.com:80" => %Conn{
conn: _,
state: :down,
waiting_pids: _
@@ -121,10 +128,11 @@ defmodule Gun.ConnectionsTest do
%Connections{
conns: %{
- "gun_down_and_up.com:80" => %Conn{
+ "http:gun_down_and_up.com:80" => %Conn{
conn: _,
state: :down,
- waiting_pids: _
+ waiting_pids: _,
+ used: 0
}
}
} = Connections.get_state(name)
@@ -136,10 +144,11 @@ defmodule Gun.ConnectionsTest do
%Connections{
conns: %{
- "gun_down_and_up.com:80" => %Conn{
+ "http:gun_down_and_up.com:80" => %Conn{
conn: _,
state: :up,
- waiting_pids: []
+ waiting_pids: [],
+ used: 2
}
}
} = Connections.get_state(name)
@@ -164,10 +173,11 @@ defmodule Gun.ConnectionsTest do
%Connections{
conns: %{
- "some-domain.com:80" => %Conn{
+ "http:some-domain.com:80" => %Conn{
conn: conn,
state: :up,
- waiting_pids: []
+ waiting_pids: [],
+ used: 5
}
}
} = Connections.get_state(name)
@@ -175,6 +185,52 @@ defmodule Gun.ConnectionsTest do
assert Enum.all?(conns, fn res -> res == conn end)
end
+ test "remove frequently used", %{name: name, pid: pid} do
+ Connections.get_conn("https://some-domain.com", [genserver_pid: pid], name)
+
+ for _ <- 1..4 do
+ Connections.get_conn("http://some-domain.com", [genserver_pid: pid], name)
+ end
+
+ %Connections{
+ conns: %{
+ "http:some-domain.com:80" => %Conn{
+ conn: _,
+ state: :up,
+ waiting_pids: [],
+ used: 4
+ },
+ "https:some-domain.com:443" => %Conn{
+ conn: _,
+ state: :up,
+ waiting_pids: [],
+ used: 1
+ }
+ },
+ opts: [max_connections: 2, timeout: 10]
+ } = Connections.get_state(name)
+
+ conn = Connections.get_conn("http://another-domain.com", [genserver_pid: pid], name)
+
+ %Connections{
+ conns: %{
+ "http:another-domain.com:80" => %Conn{
+ conn: ^conn,
+ state: :up,
+ waiting_pids: [],
+ used: 1
+ },
+ "http:some-domain.com:80" => %Conn{
+ conn: _,
+ state: :up,
+ waiting_pids: [],
+ used: 4
+ }
+ },
+ opts: [max_connections: 2, timeout: 10]
+ } = Connections.get_state(name)
+ end
+
describe "integration test" do
@describetag :integration
@@ -193,10 +249,11 @@ defmodule Gun.ConnectionsTest do
%Connections{
conns: %{
- "httpbin.org:80" => %Conn{
+ "http:httpbin.org:80" => %Conn{
conn: ^conn,
state: :up,
- waiting_pids: []
+ waiting_pids: [],
+ used: 2
}
}
} = Connections.get_state(name)
@@ -217,10 +274,11 @@ defmodule Gun.ConnectionsTest do
%Connections{
conns: %{
- "httpbin.org:443" => %Conn{
+ "https:httpbin.org:443" => %Conn{
conn: ^conn,
state: :up,
- waiting_pids: []
+ waiting_pids: [],
+ used: 2
}
}
} = Connections.get_state(name)