diff options
Diffstat (limited to 'lib')
-rw-r--r-- | lib/pleroma/gun/conn.ex | 11 | ||||
-rw-r--r-- | lib/pleroma/gun/connections.ex | 132 | ||||
-rw-r--r-- | lib/pleroma/http/connection.ex | 5 | ||||
-rw-r--r-- | lib/pleroma/http/http.ex | 30 |
4 files changed, 129 insertions, 49 deletions
diff --git a/lib/pleroma/gun/conn.ex b/lib/pleroma/gun/conn.ex index 9e5c2b184..906607b28 100644 --- a/lib/pleroma/gun/conn.ex +++ b/lib/pleroma/gun/conn.ex @@ -6,17 +6,24 @@ defmodule Pleroma.Gun.Conn do @moduledoc """ Struct for gun connection data """ + @type gun_state :: :open | :up | :down + @type conn_state :: :init | :active | :idle + @type t :: %__MODULE__{ conn: pid(), - state: atom(), + gun_state: gun_state(), waiting_pids: [pid()], + conn_state: conn_state(), + used_by: [pid()], last_reference: pos_integer(), crf: float() } defstruct conn: nil, - state: :open, + gun_state: :open, waiting_pids: [], + conn_state: :init, + used_by: [], last_reference: :os.system_time(:second), crf: 1 end diff --git a/lib/pleroma/gun/connections.ex b/lib/pleroma/gun/connections.ex index 73d54e94d..e3d392de7 100644 --- a/lib/pleroma/gun/connections.ex +++ b/lib/pleroma/gun/connections.ex @@ -14,7 +14,7 @@ defmodule Pleroma.Gun.Connections do opts: keyword() } - defstruct conns: %{}, opts: [] + defstruct conns: %{}, opts: [], queue: [] alias Pleroma.Gun.API alias Pleroma.Gun.Conn @@ -27,8 +27,8 @@ defmodule Pleroma.Gun.Connections do @impl true def init(opts), do: {:ok, %__MODULE__{conns: %{}, opts: opts}} - @spec get_conn(String.t(), keyword(), atom()) :: pid() - def get_conn(url, opts \\ [], name \\ :default) do + @spec checkin(String.t(), keyword(), atom()) :: pid() + def checkin(url, opts \\ [], name \\ :default) do opts = Enum.into(opts, %{}) uri = URI.parse(url) @@ -53,7 +53,7 @@ defmodule Pleroma.Gun.Connections do GenServer.call( name, - {:conn, %{opts: opts, uri: uri}} + {:checkin, %{opts: opts, uri: uri}} ) end @@ -68,28 +68,57 @@ defmodule Pleroma.Gun.Connections do GenServer.call(name, {:state}) end + def checkout(conn, pid, name \\ :default) do + GenServer.cast(name, {:checkout, conn, pid}) + end + + def process_queue(name \\ :default) do + GenServer.cast(name, {:process_queue}) + end + @impl true - def handle_call({:conn, %{opts: opts, uri: uri}}, from, state) do + def handle_cast({:checkout, conn_pid, pid}, state) do + {key, conn} = find_conn(state.conns, conn_pid) + used_by = List.keydelete(conn.used_by, pid, 0) + conn_state = if used_by == [], do: :idle, else: conn.conn_state + state = put_in(state.conns[key], %{conn | conn_state: conn_state, used_by: used_by}) + {:noreply, state} + end + + @impl true + def handle_cast({:process_queue}, state) do + case state.queue do + [{from, key, uri, opts} | _queue] -> + try_to_checkin(key, uri, from, state, Map.put(opts, :from_cast, true)) + + [] -> + {:noreply, state} + end + end + + @impl true + def handle_call({:checkin, %{opts: opts, uri: uri}}, from, state) do key = compose_key(uri) case state.conns[key] do - %{conn: conn, state: conn_state, last_reference: reference, crf: last_crf} = current_conn - when conn_state == :up -> + %{conn: conn, gun_state: gun_state} = current_conn when gun_state == :up -> time = current_time() - last_reference = time - reference + last_reference = time - current_conn.last_reference - current_crf = crf(last_reference, 100, last_crf) + current_crf = crf(last_reference, 100, current_conn.crf) state = put_in(state.conns[key], %{ current_conn | last_reference: time, - crf: current_crf + crf: current_crf, + conn_state: :active, + used_by: [from | current_conn.used_by] }) {:reply, conn, state} - %{state: conn_state, waiting_pids: pids} when conn_state in [:open, :down] -> + %{gun_state: gun_state, waiting_pids: pids} when gun_state in [:open, :down] -> state = put_in(state.conns[key].waiting_pids, [from | pids]) {:noreply, state} @@ -99,22 +128,7 @@ defmodule Pleroma.Gun.Connections do if Enum.count(state.conns) < max_connections do open_conn(key, uri, from, state, opts) else - [{close_key, least_used} | _conns] = - state.conns - |> Enum.filter(fn {_k, v} -> v.waiting_pids == [] end) - |> Enum.sort(fn {_x_k, x}, {_y_k, y} -> - x.crf < y.crf and x.last_reference < y.last_reference - end) - - :ok = API.close(least_used.conn) - - state = - put_in( - state.conns, - Map.delete(state.conns, close_key) - ) - - open_conn(key, uri, from, state, opts) + try_to_checkin(key, uri, from, state, opts) end end end @@ -122,14 +136,44 @@ defmodule Pleroma.Gun.Connections do @impl true def handle_call({:state}, _from, state), do: {:reply, state, state} + defp try_to_checkin(key, uri, from, state, opts) do + unused_conns = + state.conns + |> Enum.filter(fn {_k, v} -> + v.conn_state == :idle and v.waiting_pids == [] and v.used_by == [] + end) + |> Enum.sort(fn {_x_k, x}, {_y_k, y} -> + x.crf < y.crf and x.last_reference < y.last_reference + end) + + case unused_conns do + [{close_key, least_used} | _conns] -> + :ok = API.close(least_used.conn) + + state = + put_in( + state.conns, + Map.delete(state.conns, close_key) + ) + + open_conn(key, uri, from, state, opts) + + [] -> + queue = + if List.keymember?(state.queue, from, 0), + do: state.queue, + else: state.queue ++ [{from, key, uri, opts}] + + state = put_in(state.queue, queue) + {:noreply, state} + end + end + @impl true def handle_info({:gun_up, conn_pid, _protocol}, state) do conn_key = compose_key_gun_info(conn_pid) {key, conn} = find_conn(state.conns, conn_pid, conn_key) - # Send to all waiting processes connection pid - Enum.each(conn.waiting_pids, fn waiting_pid -> GenServer.reply(waiting_pid, conn_pid) end) - # Update state of the current connection and set waiting_pids to empty list time = current_time() last_reference = time - conn.last_reference @@ -138,12 +182,17 @@ defmodule Pleroma.Gun.Connections do state = put_in(state.conns[key], %{ conn - | state: :up, + | gun_state: :up, waiting_pids: [], last_reference: time, - crf: current_crf + crf: current_crf, + conn_state: :active, + used_by: conn.waiting_pids ++ conn.used_by }) + # Send to all waiting processes connection pid + Enum.each(conn.waiting_pids, fn waiting_pid -> GenServer.reply(waiting_pid, conn_pid) end) + {:noreply, state} end @@ -154,7 +203,7 @@ defmodule Pleroma.Gun.Connections do Enum.each(conn.waiting_pids, fn waiting_pid -> GenServer.reply(waiting_pid, nil) end) - state = put_in(state.conns[key].state, :down) + state = put_in(state.conns[key].gun_state, :down) {:noreply, state} end @@ -177,7 +226,7 @@ defmodule Pleroma.Gun.Connections do end) end - defp open_conn(key, uri, _from, state, %{proxy: {proxy_host, proxy_port}} = opts) do + defp open_conn(key, uri, from, state, %{proxy: {proxy_host, proxy_port}} = opts) do host = to_charlist(uri.host) port = uri.port @@ -202,9 +251,15 @@ defmodule Pleroma.Gun.Connections do put_in(state.conns[key], %Conn{ conn: conn, waiting_pids: [], - state: :up + gun_state: :up, + conn_state: :active, + used_by: [from] }) + if opts[:from_cast] do + GenServer.reply(from, conn) + end + {:reply, conn, state} else error -> @@ -219,6 +274,13 @@ defmodule Pleroma.Gun.Connections do with {:ok, conn} <- API.open(host, port, opts) do state = + if opts[:from_cast] do + put_in(state.queue, List.keydelete(state.queue, from, 0)) + else + state + end + + state = put_in(state.conns[key], %Conn{ conn: conn, waiting_pids: [from] diff --git a/lib/pleroma/http/connection.ex b/lib/pleroma/http/connection.ex index 39c0fff43..d4e6d0f99 100644 --- a/lib/pleroma/http/connection.ex +++ b/lib/pleroma/http/connection.ex @@ -10,8 +10,7 @@ defmodule Pleroma.HTTP.Connection do @options [ connect_timeout: 10_000, timeout: 20_000, - pool: :federation, - version: :master + pool: :federation ] require Logger @@ -61,7 +60,7 @@ defmodule Pleroma.HTTP.Connection do end defp get_conn_for_gun(url, options, pool) do - case Pleroma.Gun.Connections.get_conn(url, options, pool) do + case Pleroma.Gun.Connections.checkin(url, options, pool) do nil -> options diff --git a/lib/pleroma/http/http.ex b/lib/pleroma/http/http.ex index 5c0d66955..0a7db737f 100644 --- a/lib/pleroma/http/http.ex +++ b/lib/pleroma/http/http.ex @@ -45,15 +45,27 @@ defmodule Pleroma.HTTP do params = Keyword.get(options, :params, []) - %{} - |> Builder.method(method) - |> Builder.url(url) - |> Builder.headers(headers) - |> Builder.opts(options) - |> Builder.add_param(:body, :body, body) - |> Builder.add_param(:query, :query, params) - |> Enum.into([]) - |> (&Tesla.request(Connection.new(options), &1)).() + request = + %{} + |> Builder.method(method) + |> Builder.url(url) + |> Builder.headers(headers) + |> Builder.opts(options) + |> Builder.add_param(:body, :body, body) + |> Builder.add_param(:query, :query, params) + |> Enum.into([]) + + client = Connection.new(options) + response = Tesla.request(client, request) + + if adapter_gun? do + %{adapter: {_, _, [adapter_options]}} = client + pool = adapter_options[:pool] + Pleroma.Gun.Connections.checkout(adapter_options[:conn], self(), pool) + Pleroma.Gun.Connections.process_queue(pool) + end + + response rescue e -> {:error, e} |