diff options
author | Alexander Strizhakov <alex.strizhakov@gmail.com> | 2020-02-24 19:56:27 +0300 |
---|---|---|
committer | Alexander Strizhakov <alex.strizhakov@gmail.com> | 2020-02-24 19:56:27 +0300 |
commit | 8efae966b1e87fe448a13d04eae0898c4a102c29 (patch) | |
tree | 9c2cb158991e7ac582a9e6b4e2979016ec3a4428 /lib | |
parent | d44f9e3b6cfd5a0dae07f6194bfd05360afd6560 (diff) | |
download | pleroma-8efae966b1e87fe448a13d04eae0898c4a102c29.tar.gz |
open conn in separate task
Diffstat (limited to 'lib')
-rw-r--r-- | lib/mix/tasks/pleroma/benchmark.ex | 2 | ||||
-rw-r--r-- | lib/pleroma/gun/api.ex | 7 | ||||
-rw-r--r-- | lib/pleroma/gun/api/mock.ex | 5 | ||||
-rw-r--r-- | lib/pleroma/gun/conn.ex | 146 | ||||
-rw-r--r-- | lib/pleroma/gun/gun.ex | 5 | ||||
-rw-r--r-- | lib/pleroma/http/adapter/gun.ex | 21 | ||||
-rw-r--r-- | lib/pleroma/pool/connections.ex | 287 |
7 files changed, 266 insertions, 207 deletions
diff --git a/lib/mix/tasks/pleroma/benchmark.ex b/lib/mix/tasks/pleroma/benchmark.ex index 01e079136..7a7430289 100644 --- a/lib/mix/tasks/pleroma/benchmark.ex +++ b/lib/mix/tasks/pleroma/benchmark.ex @@ -79,7 +79,7 @@ defmodule Mix.Tasks.Pleroma.Benchmark do start_pleroma() :ok = - Pleroma.Pool.Connections.open_conn( + Pleroma.Gun.Conn.open( "https://httpbin.org/stream-bytes/1500", :gun_connections ) diff --git a/lib/pleroma/gun/api.ex b/lib/pleroma/gun/api.ex index a0c3c5415..f79c9f443 100644 --- a/lib/pleroma/gun/api.ex +++ b/lib/pleroma/gun/api.ex @@ -6,9 +6,10 @@ defmodule Pleroma.Gun.API do @callback open(charlist(), pos_integer(), map()) :: {:ok, pid()} @callback info(pid()) :: map() @callback close(pid()) :: :ok - @callback await_up(pid) :: {:ok, atom()} | {:error, atom()} + @callback await_up(pid, pos_integer()) :: {:ok, atom()} | {:error, atom()} @callback connect(pid(), map()) :: reference() @callback await(pid(), reference()) :: {:response, :fin, 200, []} + @callback set_owner(pid(), pid()) :: :ok def open(host, port, opts), do: api().open(host, port, opts) @@ -16,11 +17,13 @@ defmodule Pleroma.Gun.API do def close(pid), do: api().close(pid) - def await_up(pid), do: api().await_up(pid) + def await_up(pid, timeout \\ 5_000), do: api().await_up(pid, timeout) def connect(pid, opts), do: api().connect(pid, opts) def await(pid, ref), do: api().await(pid, ref) + def set_owner(pid, owner), do: api().set_owner(pid, owner) + defp api, do: Pleroma.Config.get([Pleroma.Gun.API], Pleroma.Gun) end diff --git a/lib/pleroma/gun/api/mock.ex b/lib/pleroma/gun/api/mock.ex index 0134b016e..6d24b0e69 100644 --- a/lib/pleroma/gun/api/mock.ex +++ b/lib/pleroma/gun/api/mock.ex @@ -118,7 +118,10 @@ defmodule Pleroma.Gun.API.Mock do end @impl API - def await_up(_pid), do: {:ok, :http} + def await_up(_pid, _timeout), do: {:ok, :http} + + @impl API + def set_owner(_pid, _owner), do: :ok @impl API def connect(pid, %{host: _, port: 80}) do diff --git a/lib/pleroma/gun/conn.ex b/lib/pleroma/gun/conn.ex index 2474829d6..ddb9f30b0 100644 --- a/lib/pleroma/gun/conn.ex +++ b/lib/pleroma/gun/conn.ex @@ -6,6 +6,11 @@ defmodule Pleroma.Gun.Conn do @moduledoc """ Struct for gun connection data """ + alias Pleroma.Gun.API + alias Pleroma.Pool.Connections + + require Logger + @type gun_state :: :up | :down @type conn_state :: :active | :idle @@ -26,4 +31,145 @@ defmodule Pleroma.Gun.Conn do last_reference: 0, crf: 1, retries: 0 + + @spec open(String.t() | URI.t(), atom(), keyword()) :: :ok | nil + def open(url, name, opts \\ []) + def open(url, name, opts) when is_binary(url), do: open(URI.parse(url), name, opts) + + def open(%URI{} = uri, name, opts) do + pool_opts = Pleroma.Config.get([:connections_pool], []) + + opts = + opts + |> Enum.into(%{}) + |> Map.put_new(:retry, pool_opts[:retry] || 0) + |> Map.put_new(:retry_timeout, pool_opts[:retry_timeout] || 100) + |> Map.put_new(:await_up_timeout, pool_opts[:await_up_timeout] || 5_000) + + key = "#{uri.scheme}:#{uri.host}:#{uri.port}" + + Logger.debug("opening new connection #{Connections.compose_uri_log(uri)}") + + conn_pid = + if Connections.count(name) < opts[:max_connection] do + do_open(uri, opts) + else + try_do_open(name, uri, opts) + end + + if is_pid(conn_pid) do + conn = %Pleroma.Gun.Conn{ + conn: conn_pid, + gun_state: :up, + conn_state: :active, + last_reference: :os.system_time(:second) + } + + :ok = API.set_owner(conn_pid, Process.whereis(name)) + Connections.add_conn(name, key, conn) + end + end + + defp do_open(uri, %{proxy: {proxy_host, proxy_port}} = opts) do + connect_opts = + uri + |> destination_opts() + |> add_http2_opts(uri.scheme, Map.get(opts, :tls_opts, [])) + + with open_opts <- Map.delete(opts, :tls_opts), + {:ok, conn} <- API.open(proxy_host, proxy_port, open_opts), + {:ok, _} <- API.await_up(conn, opts[:await_up_timeout]), + stream <- API.connect(conn, connect_opts), + {:response, :fin, 200, _} <- API.await(conn, stream) do + conn + else + error -> + Logger.warn( + "Received error on opening connection with http proxy #{ + Connections.compose_uri_log(uri) + } #{inspect(error)}" + ) + + nil + end + end + + defp do_open(uri, %{proxy: {proxy_type, proxy_host, proxy_port}} = opts) do + version = + proxy_type + |> to_string() + |> String.last() + |> case do + "4" -> 4 + _ -> 5 + end + + socks_opts = + uri + |> destination_opts() + |> add_http2_opts(uri.scheme, Map.get(opts, :tls_opts, [])) + |> Map.put(:version, version) + + opts = + opts + |> Map.put(:protocols, [:socks]) + |> Map.put(:socks_opts, socks_opts) + + with {:ok, conn} <- API.open(proxy_host, proxy_port, opts), + {:ok, _} <- API.await_up(conn, opts[:await_up_timeout]) do + conn + else + error -> + Logger.warn( + "Received error on opening connection with socks proxy #{ + Connections.compose_uri_log(uri) + } #{inspect(error)}" + ) + + nil + end + end + + defp do_open(%URI{host: host, port: port} = uri, opts) do + {_type, host} = Pleroma.HTTP.Adapter.domain_or_ip(host) + + with {:ok, conn} <- API.open(host, port, opts), + {:ok, _} <- API.await_up(conn, opts[:await_up_timeout]) do + conn + else + error -> + Logger.warn( + "Received error on opening connection #{Connections.compose_uri_log(uri)} #{ + inspect(error) + }" + ) + + nil + end + end + + defp destination_opts(%URI{host: host, port: port}) do + {_type, host} = Pleroma.HTTP.Adapter.domain_or_ip(host) + %{host: host, port: port} + end + + defp add_http2_opts(opts, "https", tls_opts) do + Map.merge(opts, %{protocols: [:http2], transport: :tls, tls_opts: tls_opts}) + end + + defp add_http2_opts(opts, _, _), do: opts + + defp try_do_open(name, uri, opts) do + Logger.debug("try to open conn #{Connections.compose_uri_log(uri)}") + + with [{close_key, least_used} | _conns] <- + Connections.get_unused_conns(name), + :ok <- Pleroma.Gun.API.close(least_used.conn) do + Connections.remove_conn(name, close_key) + + do_open(uri, opts) + else + [] -> nil + end + end end diff --git a/lib/pleroma/gun/gun.ex b/lib/pleroma/gun/gun.ex index 4a1bbc95f..da82983b1 100644 --- a/lib/pleroma/gun/gun.ex +++ b/lib/pleroma/gun/gun.ex @@ -32,7 +32,7 @@ defmodule Pleroma.Gun do defdelegate close(pid), to: :gun @impl API - defdelegate await_up(pid), to: :gun + defdelegate await_up(pid, timeout \\ 5_000), to: :gun @impl API defdelegate connect(pid, opts), to: :gun @@ -42,4 +42,7 @@ defmodule Pleroma.Gun do @spec flush(pid() | reference()) :: :ok defdelegate flush(pid), to: :gun + + @impl API + defdelegate set_owner(pid, owner), to: :gun end diff --git a/lib/pleroma/http/adapter/gun.ex b/lib/pleroma/http/adapter/gun.ex index 7b7e38d8c..908d71898 100644 --- a/lib/pleroma/http/adapter/gun.ex +++ b/lib/pleroma/http/adapter/gun.ex @@ -12,7 +12,7 @@ defmodule Pleroma.HTTP.Adapter.Gun do alias Pleroma.Pool.Connections @defaults [ - connect_timeout: 20_000, + connect_timeout: 5_000, domain_lookup_timeout: 5_000, tls_handshake_timeout: 5_000, retry: 0, @@ -94,13 +94,11 @@ defmodule Pleroma.HTTP.Adapter.Gun do "Gun connections pool checkin was not successful. Trying to open conn for next request." ) - :ok = Connections.open_conn(uri, :gun_connections, opts) + Task.start(fn -> Pleroma.Gun.Conn.open(uri, :gun_connections, opts) end) opts conn when is_pid(conn) -> - Logger.debug( - "received conn #{inspect(conn)} #{uri.scheme}://#{Connections.compose_uri(uri)}" - ) + Logger.debug("received conn #{inspect(conn)} #{Connections.compose_uri_log(uri)}") opts |> Keyword.put(:conn, conn) @@ -109,13 +107,14 @@ defmodule Pleroma.HTTP.Adapter.Gun do rescue error -> Logger.warn( - "Gun connections pool checkin caused error #{uri.scheme}://#{ - Connections.compose_uri(uri) - } #{inspect(error)}" + "Gun connections pool checkin caused error #{Connections.compose_uri_log(uri)} #{ + inspect(error) + }" ) opts catch + # TODO: here must be no timeouts :exit, {:timeout, {_, operation, [_, {method, _}, _]}} -> {:message_queue_len, messages_len} = :gun_connections @@ -124,15 +123,15 @@ defmodule Pleroma.HTTP.Adapter.Gun do Logger.warn( "Gun connections pool checkin with timeout error for #{operation} #{method} #{ - uri.scheme - }://#{Connections.compose_uri(uri)}. Messages length: #{messages_len}" + Connections.compose_uri_log(uri) + }. Messages length: #{messages_len}" ) opts :exit, error -> Logger.warn( - "Gun pool checkin exited with error #{uri.scheme}://#{Connections.compose_uri(uri)} #{ + "Gun pool checkin exited with error #{Connections.compose_uri_log(uri)} #{ inspect(error) }" ) diff --git a/lib/pleroma/pool/connections.ex b/lib/pleroma/pool/connections.ex index d20927580..a444f822f 100644 --- a/lib/pleroma/pool/connections.ex +++ b/lib/pleroma/pool/connections.ex @@ -20,7 +20,6 @@ defmodule Pleroma.Pool.Connections do defstruct conns: %{}, opts: [] alias Pleroma.Gun.API - alias Pleroma.Gun.Conn @spec start_link({atom(), keyword()}) :: {:ok, pid()} def start_link({name, opts}) do @@ -44,23 +43,6 @@ defmodule Pleroma.Pool.Connections do ) end - @spec open_conn(String.t() | URI.t(), atom(), keyword()) :: :ok - def open_conn(url, name, opts \\ []) - def open_conn(url, name, opts) when is_binary(url), do: open_conn(URI.parse(url), name, opts) - - def open_conn(%URI{} = uri, name, opts) do - pool_opts = Config.get([:connections_pool], []) - - opts = - opts - |> Enum.into(%{}) - |> Map.put_new(:retry, pool_opts[:retry] || 0) - |> Map.put_new(:retry_timeout, pool_opts[:retry_timeout] || 100) - |> Map.put_new(:await_up_timeout, pool_opts[:await_up_timeout] || 5_000) - - GenServer.cast(name, {:open_conn, %{opts: opts, uri: uri}}) - end - @spec alive?(atom()) :: boolean() def alive?(name) do pid = Process.whereis(name) @@ -72,23 +54,37 @@ defmodule Pleroma.Pool.Connections do GenServer.call(name, :state) end + @spec count(atom()) :: pos_integer() + def count(name) do + GenServer.call(name, :count) + end + + @spec get_unused_conns(atom()) :: [{domain(), conn()}] + def get_unused_conns(name) do + GenServer.call(name, :unused_conns) + end + @spec checkout(pid(), pid(), atom()) :: :ok def checkout(conn, pid, name) do GenServer.cast(name, {:checkout, conn, pid}) end - @impl true - def handle_cast({:open_conn, %{opts: opts, uri: uri}}, state) do - Logger.debug("opening new #{compose_uri(uri)}") - max_connections = state.opts[:max_connections] + @spec add_conn(atom(), String.t(), Pleroma.Gun.Conn.t()) :: :ok + def add_conn(name, key, conn) do + GenServer.cast(name, {:add_conn, key, conn}) + end - key = compose_key(uri) + @spec remove_conn(atom(), String.t()) :: :ok + def remove_conn(name, key) do + GenServer.cast(name, {:remove_conn, key}) + end - if Enum.count(state.conns) < max_connections do - open_conn(key, uri, state, opts) - else - try_to_open_conn(key, uri, state, opts) - end + @impl true + def handle_cast({:add_conn, key, conn}, state) do + state = put_in(state.conns[key], conn) + + Process.monitor(conn.conn) + {:noreply, state} end @impl true @@ -121,13 +117,19 @@ defmodule Pleroma.Pool.Connections do end @impl true + def handle_cast({:remove_conn, key}, state) do + state = put_in(state.conns, Map.delete(state.conns, key)) + {:noreply, state} + end + + @impl true def handle_call({:checkin, uri}, from, state) do - Logger.debug("checkin #{compose_uri(uri)}") - key = compose_key(uri) + key = "#{uri.scheme}:#{uri.host}:#{uri.port}" + Logger.debug("checkin #{key}") case state.conns[key] do %{conn: conn, gun_state: gun_state} = current_conn when gun_state == :up -> - Logger.debug("reusing conn #{compose_uri(uri)}") + Logger.debug("reusing conn #{key}") with time <- :os.system_time(:second), last_reference <- time - current_conn.last_reference, @@ -155,11 +157,30 @@ defmodule Pleroma.Pool.Connections do def handle_call(:state, _from, state), do: {:reply, state, state} @impl true + def handle_call(:count, _from, state) do + {:reply, Enum.count(state.conns), state} + end + + @impl true + def handle_call(:unused_conns, _from, state) do + unused_conns = + state.conns + |> Enum.filter(fn {_k, v} -> + v.conn_state == :idle and v.used_by == [] + end) + |> Enum.sort(fn {_x_k, x}, {_y_k, y} -> + x.crf <= y.crf and x.last_reference <= y.last_reference + end) + + {:reply, unused_conns, state} + end + + @impl true def handle_info({:gun_up, conn_pid, _protocol}, state) do state = - with true <- Process.alive?(conn_pid), - conn_key when is_binary(conn_key) <- compose_key_gun_info(conn_pid), + with conn_key when is_binary(conn_key) <- compose_key_gun_info(conn_pid), {key, conn} <- find_conn(state.conns, conn_pid, conn_key), + {true, key} <- {Process.alive?(conn_pid), key}, time <- :os.system_time(:second), last_reference <- time - conn.last_reference, current_crf <- crf(last_reference, 100, conn.crf) do @@ -176,15 +197,17 @@ defmodule Pleroma.Pool.Connections do Logger.debug(":gun.info caused error") state - false -> + {false, key} -> Logger.debug(":gun_up message for closed conn #{inspect(conn_pid)}") - state - nil -> - Logger.debug( - ":gun_up message for alive conn #{inspect(conn_pid)}, but deleted from state" + put_in( + state.conns, + Map.delete(state.conns, key) ) + nil -> + Logger.debug(":gun_up message for conn which is not found in state") + :ok = API.close(conn_pid) state @@ -198,8 +221,8 @@ defmodule Pleroma.Pool.Connections do retries = Config.get([:connections_pool, :retry], 0) # we can't get info on this pid, because pid is dead state = - with true <- Process.alive?(conn_pid), - {key, conn} <- find_conn(state.conns, conn_pid) do + with {key, conn} <- find_conn(state.conns, conn_pid), + {true, key} <- {Process.alive?(conn_pid), key} do if conn.retries == retries do Logger.debug("closing conn if retries is eq #{inspect(conn_pid)}") :ok = API.close(conn.conn) @@ -216,16 +239,18 @@ defmodule Pleroma.Pool.Connections do }) end else - false -> + {false, key} -> # gun can send gun_down for closed conn, maybe connection is not closed yet Logger.debug(":gun_down message for closed conn #{inspect(conn_pid)}") - state - nil -> - Logger.debug( - ":gun_down message for alive conn #{inspect(conn_pid)}, but deleted from state" + put_in( + state.conns, + Map.delete(state.conns, key) ) + nil -> + Logger.debug(":gun_down message for conn which is not found in state") + :ok = API.close(conn_pid) state @@ -234,7 +259,29 @@ defmodule Pleroma.Pool.Connections do {:noreply, state} end - defp compose_key(%URI{scheme: scheme, host: host, port: port}), do: "#{scheme}:#{host}:#{port}" + @impl true + def handle_info({:DOWN, _ref, :process, conn_pid, reason}, state) do + Logger.debug("received DOWM message for #{inspect(conn_pid)} reason -> #{inspect(reason)}") + + state = + with {key, conn} <- find_conn(state.conns, conn_pid) do + Enum.each(conn.used_by, fn {pid, _ref} -> + Process.exit(pid, reason) + end) + + put_in( + state.conns, + Map.delete(state.conns, key) + ) + else + nil -> + Logger.debug(":DOWN message for conn which is not found in state") + + state + end + + {:noreply, state} + end defp compose_key_gun_info(pid) do try do @@ -265,153 +312,11 @@ defmodule Pleroma.Pool.Connections do end) end - defp open_conn(key, uri, state, %{proxy: {proxy_host, proxy_port}} = opts) do - connect_opts = - uri - |> destination_opts() - |> add_http2_opts(uri.scheme, Map.get(opts, :tls_opts, [])) - - with open_opts <- Map.delete(opts, :tls_opts), - {:ok, conn} <- API.open(proxy_host, proxy_port, open_opts), - {:ok, _} <- API.await_up(conn), - stream <- API.connect(conn, connect_opts), - {:response, :fin, 200, _} <- API.await(conn, stream), - state <- - put_in(state.conns[key], %Conn{ - conn: conn, - gun_state: :up, - conn_state: :active, - last_reference: :os.system_time(:second) - }) do - {:noreply, state} - else - error -> - Logger.warn( - "Received error on opening connection with http proxy #{uri.scheme}://#{ - compose_uri(uri) - }: #{inspect(error)}" - ) - - {:noreply, state} - end - end - - defp open_conn(key, uri, state, %{proxy: {proxy_type, proxy_host, proxy_port}} = opts) do - version = - proxy_type - |> to_string() - |> String.last() - |> case do - "4" -> 4 - _ -> 5 - end - - socks_opts = - uri - |> destination_opts() - |> add_http2_opts(uri.scheme, Map.get(opts, :tls_opts, [])) - |> Map.put(:version, version) - - opts = - opts - |> Map.put(:protocols, [:socks]) - |> Map.put(:socks_opts, socks_opts) - - with {:ok, conn} <- API.open(proxy_host, proxy_port, opts), - {:ok, _} <- API.await_up(conn), - state <- - put_in(state.conns[key], %Conn{ - conn: conn, - gun_state: :up, - conn_state: :active, - last_reference: :os.system_time(:second) - }) do - {:noreply, state} - else - error -> - Logger.warn( - "Received error on opening connection with socks proxy #{uri.scheme}://#{ - compose_uri(uri) - }: #{inspect(error)}" - ) - - {:noreply, state} - end - end - - defp open_conn(key, %URI{host: host, port: port} = uri, state, opts) do - Logger.debug("opening conn #{compose_uri(uri)}") - {_type, host} = Pleroma.HTTP.Adapter.domain_or_ip(host) - - with {:ok, conn} <- API.open(host, port, opts), - {:ok, _} <- API.await_up(conn), - state <- - put_in(state.conns[key], %Conn{ - conn: conn, - gun_state: :up, - conn_state: :active, - last_reference: :os.system_time(:second) - }) do - Logger.debug("new conn opened #{compose_uri(uri)}") - Logger.debug("replying to the call #{compose_uri(uri)}") - {:noreply, state} - else - error -> - Logger.warn( - "Received error on opening connection #{uri.scheme}://#{compose_uri(uri)}: #{ - inspect(error) - }" - ) - - {:noreply, state} - end - end - - defp destination_opts(%URI{host: host, port: port}) do - {_type, host} = Pleroma.HTTP.Adapter.domain_or_ip(host) - %{host: host, port: port} - end - - defp add_http2_opts(opts, "https", tls_opts) do - Map.merge(opts, %{protocols: [:http2], transport: :tls, tls_opts: tls_opts}) - end - - defp add_http2_opts(opts, _, _), do: opts - - @spec get_unused_conns(map()) :: [{domain(), conn()}] - def get_unused_conns(conns) do - conns - |> Enum.filter(fn {_k, v} -> - v.conn_state == :idle and v.used_by == [] - end) - |> Enum.sort(fn {_x_k, x}, {_y_k, y} -> - x.crf <= y.crf and x.last_reference <= y.last_reference - end) - end - - defp try_to_open_conn(key, uri, state, opts) do - Logger.debug("try to open conn #{compose_uri(uri)}") - - with [{close_key, least_used} | _conns] <- get_unused_conns(state.conns), - :ok <- API.close(least_used.conn), - state <- - put_in( - state.conns, - Map.delete(state.conns, close_key) - ) do - Logger.debug( - "least used conn found and closed #{inspect(least_used.conn)} #{compose_uri(uri)}" - ) - - open_conn(key, uri, state, opts) - else - [] -> {:noreply, state} - end - end - def crf(current, steps, crf) do 1 + :math.pow(0.5, current / steps) * crf end - def compose_uri(%URI{} = uri), do: "#{uri.host}#{uri.path}" + def compose_uri_log(%URI{scheme: scheme, host: host, path: path}) do + "#{scheme}://#{host}#{path}" + end end |