diff options
Diffstat (limited to 'lib')
-rw-r--r-- | lib/pleroma/gun/conn.ex | 45 | ||||
-rw-r--r-- | lib/pleroma/http/adapter_helper/gun.ex | 1 | ||||
-rw-r--r-- | lib/pleroma/pool/connections.ex | 74 |
3 files changed, 84 insertions, 36 deletions
diff --git a/lib/pleroma/gun/conn.ex b/lib/pleroma/gun/conn.ex index a0f110b42..4c2f5b1bf 100644 --- a/lib/pleroma/gun/conn.ex +++ b/lib/pleroma/gun/conn.ex @@ -11,23 +11,25 @@ defmodule Pleroma.Gun.Conn do require Logger - @type gun_state :: :up | :down - @type conn_state :: :active | :idle + @type gun_state :: :init | :up | :down + @type conn_state :: :init | :active | :idle @type t :: %__MODULE__{ conn: pid(), gun_state: gun_state(), conn_state: conn_state(), - used_by: [pid()], + used_by: [GenServer.from()], + awaited_by: [GenServer.from()], last_reference: pos_integer(), crf: float(), retries: pos_integer() } defstruct conn: nil, - gun_state: :open, - conn_state: :idle, + gun_state: :init, + conn_state: :init, used_by: [], + awaited_by: [], last_reference: 0, crf: 1, retries: 0 @@ -51,22 +53,19 @@ defmodule Pleroma.Gun.Conn do max_connections = pool_opts[:max_connections] || 250 - conn_pid = - if Connections.count(name) < max_connections do - do_open(uri, opts) - else - close_least_used_and_do_open(name, uri, opts) - end - - if is_pid(conn_pid) do - conn = %Pleroma.Gun.Conn{ - conn: conn_pid, - gun_state: :up, - last_reference: :os.system_time(:second) - } - + with {:ok, conn_pid} <- try_open(name, uri, opts, max_connections) do :ok = Gun.set_owner(conn_pid, Process.whereis(name)) - Connections.add_conn(name, key, conn) + Connections.update_conn(name, key, conn_pid) + else + _error -> Connections.remove_conn(name, key) + end + end + + defp try_open(name, uri, opts, max_connections) do + if Connections.count(name) < max_connections do + do_open(uri, opts) + else + close_least_used_and_do_open(name, uri, opts) end end @@ -104,7 +103,7 @@ defmodule Pleroma.Gun.Conn do {:ok, _} <- Gun.await_up(conn, opts[:await_up_timeout]), stream <- Gun.connect(conn, connect_opts), {:response, :fin, 200, _} <- Gun.await(conn, stream) do - conn + {:ok, conn} else error -> Logger.warn( @@ -140,7 +139,7 @@ defmodule Pleroma.Gun.Conn do with {:ok, conn} <- Gun.open(proxy_host, proxy_port, opts), {:ok, _} <- Gun.await_up(conn, opts[:await_up_timeout]) do - conn + {:ok, conn} else error -> Logger.warn( @@ -158,7 +157,7 @@ defmodule Pleroma.Gun.Conn do with {:ok, conn} <- Gun.open(host, port, opts), {:ok, _} <- Gun.await_up(conn, opts[:await_up_timeout]) do - conn + {:ok, conn} else error -> Logger.warn( diff --git a/lib/pleroma/http/adapter_helper/gun.ex b/lib/pleroma/http/adapter_helper/gun.ex index 18cf64bcc..bdf2bcc06 100644 --- a/lib/pleroma/http/adapter_helper/gun.ex +++ b/lib/pleroma/http/adapter_helper/gun.ex @@ -73,7 +73,6 @@ defmodule Pleroma.HTTP.AdapterHelper.Gun do defp checkin_conn(uri, opts) do case Connections.checkin(uri, :gun_connections) do nil -> - Task.start(Pleroma.Gun.Conn, :open, [uri, :gun_connections, opts]) opts conn when is_pid(conn) -> diff --git a/lib/pleroma/pool/connections.ex b/lib/pleroma/pool/connections.ex index b9618d3c7..28d37760e 100644 --- a/lib/pleroma/pool/connections.ex +++ b/lib/pleroma/pool/connections.ex @@ -7,11 +7,12 @@ defmodule Pleroma.Pool.Connections do alias Pleroma.Config alias Pleroma.Gun + alias Pleroma.Gun.Conn require Logger @type domain :: String.t() - @type conn :: Pleroma.Gun.Conn.t() + @type conn :: Conn.t() @type seconds :: pos_integer() @type t :: %__MODULE__{ @@ -33,13 +34,11 @@ defmodule Pleroma.Pool.Connections do end @spec checkin(String.t() | URI.t(), atom()) :: pid() | nil - def checkin(url, name) - def checkin(url, name) when is_binary(url), do: checkin(URI.parse(url), name) + def checkin(url, name, opts \\ []) + def checkin(url, name, opts) when is_binary(url), do: checkin(URI.parse(url), name, opts) - def checkin(%URI{} = uri, name) do - timeout = Config.get([:connections_pool, :checkin_timeout], 250) - - GenServer.call(name, {:checkin, uri}, timeout) + def checkin(%URI{} = uri, name, opts) do + GenServer.call(name, {:checkin, uri, opts, name}) end @spec alive?(atom()) :: boolean() @@ -71,21 +70,55 @@ defmodule Pleroma.Pool.Connections do GenServer.cast(name, {:checkout, conn, pid}) end - @spec add_conn(atom(), String.t(), Pleroma.Gun.Conn.t()) :: :ok + @spec add_conn(atom(), String.t(), Conn.t()) :: :ok def add_conn(name, key, conn) do GenServer.cast(name, {:add_conn, key, conn}) end + @spec update_conn(atom(), String.t(), pid()) :: :ok + def update_conn(name, key, conn_pid) do + GenServer.cast(name, {:update_conn, key, conn_pid}) + end + @spec remove_conn(atom(), String.t()) :: :ok def remove_conn(name, key) do GenServer.cast(name, {:remove_conn, key}) end + @spec refresh(atom()) :: :ok + def refresh(name) do + GenServer.call(name, :refresh) + end + @impl true def handle_cast({:add_conn, key, conn}, state) do - state = put_in(state.conns[key], conn) + {:noreply, put_in(state.conns[key], conn)} + end + + @impl true + def handle_cast({:update_conn, key, conn_pid}, state) do + conn = state.conns[key] + + Process.monitor(conn_pid) + + conn = + Enum.reduce(conn.awaited_by, conn, fn waiting, conn -> + GenServer.reply(waiting, conn_pid) + time = :os.system_time(:second) + last_reference = time - conn.last_reference + crf = crf(last_reference, 100, conn.crf) + + %{ + conn + | last_reference: time, + crf: crf, + conn_state: :active, + used_by: [waiting | conn.used_by] + } + end) + + state = put_in(state.conns[key], %{conn | conn: conn_pid, gun_state: :up, awaited_by: []}) - Process.monitor(conn.conn) {:noreply, state} end @@ -113,12 +146,14 @@ defmodule Pleroma.Pool.Connections do @impl true def handle_cast({:remove_conn, key}, state) do + conn = state.conns[key] + Enum.each(conn.awaited_by, fn waiting -> GenServer.reply(waiting, nil) end) state = put_in(state.conns, Map.delete(state.conns, key)) {:noreply, state} end @impl true - def handle_call({:checkin, uri}, from, state) do + def handle_call({:checkin, uri, opts, name}, from, state) do key = "#{uri.scheme}:#{uri.host}:#{uri.port}" case state.conns[key] do @@ -141,8 +176,18 @@ defmodule Pleroma.Pool.Connections do %{gun_state: :down} -> {:reply, nil, state} + %{gun_state: :init} = conn -> + state = put_in(state.conns[key], %{conn | awaited_by: [from | conn.awaited_by]}) + {:noreply, state} + nil -> - {:reply, nil, state} + state = + put_in(state.conns[key], %Conn{ + awaited_by: [from] + }) + + Task.start(Conn, :open, [uri, name, opts]) + {:noreply, state} end end @@ -150,6 +195,11 @@ defmodule Pleroma.Pool.Connections do def handle_call(:state, _from, state), do: {:reply, state, state} @impl true + def handle_call(:refresh, _from, state) do + {:reply, :ok, put_in(state.conns, %{})} + end + + @impl true def handle_call(:count, _from, state) do {:reply, Enum.count(state.conns), state} end |