aboutsummaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
Diffstat (limited to 'lib')
-rw-r--r--lib/pleroma/gun/conn.ex45
-rw-r--r--lib/pleroma/http/adapter_helper/gun.ex1
-rw-r--r--lib/pleroma/pool/connections.ex74
3 files changed, 84 insertions, 36 deletions
diff --git a/lib/pleroma/gun/conn.ex b/lib/pleroma/gun/conn.ex
index a0f110b42..4c2f5b1bf 100644
--- a/lib/pleroma/gun/conn.ex
+++ b/lib/pleroma/gun/conn.ex
@@ -11,23 +11,25 @@ defmodule Pleroma.Gun.Conn do
require Logger
- @type gun_state :: :up | :down
- @type conn_state :: :active | :idle
+ @type gun_state :: :init | :up | :down
+ @type conn_state :: :init | :active | :idle
@type t :: %__MODULE__{
conn: pid(),
gun_state: gun_state(),
conn_state: conn_state(),
- used_by: [pid()],
+ used_by: [GenServer.from()],
+ awaited_by: [GenServer.from()],
last_reference: pos_integer(),
crf: float(),
retries: pos_integer()
}
defstruct conn: nil,
- gun_state: :open,
- conn_state: :idle,
+ gun_state: :init,
+ conn_state: :init,
used_by: [],
+ awaited_by: [],
last_reference: 0,
crf: 1,
retries: 0
@@ -51,22 +53,19 @@ defmodule Pleroma.Gun.Conn do
max_connections = pool_opts[:max_connections] || 250
- conn_pid =
- if Connections.count(name) < max_connections do
- do_open(uri, opts)
- else
- close_least_used_and_do_open(name, uri, opts)
- end
-
- if is_pid(conn_pid) do
- conn = %Pleroma.Gun.Conn{
- conn: conn_pid,
- gun_state: :up,
- last_reference: :os.system_time(:second)
- }
-
+ with {:ok, conn_pid} <- try_open(name, uri, opts, max_connections) do
:ok = Gun.set_owner(conn_pid, Process.whereis(name))
- Connections.add_conn(name, key, conn)
+ Connections.update_conn(name, key, conn_pid)
+ else
+ _error -> Connections.remove_conn(name, key)
+ end
+ end
+
+ defp try_open(name, uri, opts, max_connections) do
+ if Connections.count(name) < max_connections do
+ do_open(uri, opts)
+ else
+ close_least_used_and_do_open(name, uri, opts)
end
end
@@ -104,7 +103,7 @@ defmodule Pleroma.Gun.Conn do
{:ok, _} <- Gun.await_up(conn, opts[:await_up_timeout]),
stream <- Gun.connect(conn, connect_opts),
{:response, :fin, 200, _} <- Gun.await(conn, stream) do
- conn
+ {:ok, conn}
else
error ->
Logger.warn(
@@ -140,7 +139,7 @@ defmodule Pleroma.Gun.Conn do
with {:ok, conn} <- Gun.open(proxy_host, proxy_port, opts),
{:ok, _} <- Gun.await_up(conn, opts[:await_up_timeout]) do
- conn
+ {:ok, conn}
else
error ->
Logger.warn(
@@ -158,7 +157,7 @@ defmodule Pleroma.Gun.Conn do
with {:ok, conn} <- Gun.open(host, port, opts),
{:ok, _} <- Gun.await_up(conn, opts[:await_up_timeout]) do
- conn
+ {:ok, conn}
else
error ->
Logger.warn(
diff --git a/lib/pleroma/http/adapter_helper/gun.ex b/lib/pleroma/http/adapter_helper/gun.ex
index 18cf64bcc..bdf2bcc06 100644
--- a/lib/pleroma/http/adapter_helper/gun.ex
+++ b/lib/pleroma/http/adapter_helper/gun.ex
@@ -73,7 +73,6 @@ defmodule Pleroma.HTTP.AdapterHelper.Gun do
defp checkin_conn(uri, opts) do
case Connections.checkin(uri, :gun_connections) do
nil ->
- Task.start(Pleroma.Gun.Conn, :open, [uri, :gun_connections, opts])
opts
conn when is_pid(conn) ->
diff --git a/lib/pleroma/pool/connections.ex b/lib/pleroma/pool/connections.ex
index b9618d3c7..28d37760e 100644
--- a/lib/pleroma/pool/connections.ex
+++ b/lib/pleroma/pool/connections.ex
@@ -7,11 +7,12 @@ defmodule Pleroma.Pool.Connections do
alias Pleroma.Config
alias Pleroma.Gun
+ alias Pleroma.Gun.Conn
require Logger
@type domain :: String.t()
- @type conn :: Pleroma.Gun.Conn.t()
+ @type conn :: Conn.t()
@type seconds :: pos_integer()
@type t :: %__MODULE__{
@@ -33,13 +34,11 @@ defmodule Pleroma.Pool.Connections do
end
@spec checkin(String.t() | URI.t(), atom()) :: pid() | nil
- def checkin(url, name)
- def checkin(url, name) when is_binary(url), do: checkin(URI.parse(url), name)
+ def checkin(url, name, opts \\ [])
+ def checkin(url, name, opts) when is_binary(url), do: checkin(URI.parse(url), name, opts)
- def checkin(%URI{} = uri, name) do
- timeout = Config.get([:connections_pool, :checkin_timeout], 250)
-
- GenServer.call(name, {:checkin, uri}, timeout)
+ def checkin(%URI{} = uri, name, opts) do
+ GenServer.call(name, {:checkin, uri, opts, name})
end
@spec alive?(atom()) :: boolean()
@@ -71,21 +70,55 @@ defmodule Pleroma.Pool.Connections do
GenServer.cast(name, {:checkout, conn, pid})
end
- @spec add_conn(atom(), String.t(), Pleroma.Gun.Conn.t()) :: :ok
+ @spec add_conn(atom(), String.t(), Conn.t()) :: :ok
def add_conn(name, key, conn) do
GenServer.cast(name, {:add_conn, key, conn})
end
+ @spec update_conn(atom(), String.t(), pid()) :: :ok
+ def update_conn(name, key, conn_pid) do
+ GenServer.cast(name, {:update_conn, key, conn_pid})
+ end
+
@spec remove_conn(atom(), String.t()) :: :ok
def remove_conn(name, key) do
GenServer.cast(name, {:remove_conn, key})
end
+ @spec refresh(atom()) :: :ok
+ def refresh(name) do
+ GenServer.call(name, :refresh)
+ end
+
@impl true
def handle_cast({:add_conn, key, conn}, state) do
- state = put_in(state.conns[key], conn)
+ {:noreply, put_in(state.conns[key], conn)}
+ end
+
+ @impl true
+ def handle_cast({:update_conn, key, conn_pid}, state) do
+ conn = state.conns[key]
+
+ Process.monitor(conn_pid)
+
+ conn =
+ Enum.reduce(conn.awaited_by, conn, fn waiting, conn ->
+ GenServer.reply(waiting, conn_pid)
+ time = :os.system_time(:second)
+ last_reference = time - conn.last_reference
+ crf = crf(last_reference, 100, conn.crf)
+
+ %{
+ conn
+ | last_reference: time,
+ crf: crf,
+ conn_state: :active,
+ used_by: [waiting | conn.used_by]
+ }
+ end)
+
+ state = put_in(state.conns[key], %{conn | conn: conn_pid, gun_state: :up, awaited_by: []})
- Process.monitor(conn.conn)
{:noreply, state}
end
@@ -113,12 +146,14 @@ defmodule Pleroma.Pool.Connections do
@impl true
def handle_cast({:remove_conn, key}, state) do
+ conn = state.conns[key]
+ Enum.each(conn.awaited_by, fn waiting -> GenServer.reply(waiting, nil) end)
state = put_in(state.conns, Map.delete(state.conns, key))
{:noreply, state}
end
@impl true
- def handle_call({:checkin, uri}, from, state) do
+ def handle_call({:checkin, uri, opts, name}, from, state) do
key = "#{uri.scheme}:#{uri.host}:#{uri.port}"
case state.conns[key] do
@@ -141,8 +176,18 @@ defmodule Pleroma.Pool.Connections do
%{gun_state: :down} ->
{:reply, nil, state}
+ %{gun_state: :init} = conn ->
+ state = put_in(state.conns[key], %{conn | awaited_by: [from | conn.awaited_by]})
+ {:noreply, state}
+
nil ->
- {:reply, nil, state}
+ state =
+ put_in(state.conns[key], %Conn{
+ awaited_by: [from]
+ })
+
+ Task.start(Conn, :open, [uri, name, opts])
+ {:noreply, state}
end
end
@@ -150,6 +195,11 @@ defmodule Pleroma.Pool.Connections do
def handle_call(:state, _from, state), do: {:reply, state, state}
@impl true
+ def handle_call(:refresh, _from, state) do
+ {:reply, :ok, put_in(state.conns, %{})}
+ end
+
+ @impl true
def handle_call(:count, _from, state) do
{:reply, Enum.count(state.conns), state}
end